diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index d7a62f5402defb95da6a4217254e14dbbc6de56c..a36a7513f3dcd683ad4c0d3d79a7259617c8754f 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -205,7 +205,7 @@ struct SColData { int32_t numOfNull; // # of null int32_t numOfValue; // # of vale int32_t nVal; - uint8_t flag; + int8_t flag; uint8_t *pBitMap; int32_t *aOffset; int32_t nData; diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 02d076113f6cdab7066740ce071426ad06efb7d4..301b504346d80190653ea420c311e23ef7e7464b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -22,7 +22,7 @@ * us: 3600*1000000*8765*1000 // 1970 + 1000 years * ns: 3600*1000000000*8765*292 // 1970 + 292 years */ -static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L}; +int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L}; // static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); @@ -60,23 +60,6 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 return 0; } -#if 0 -static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey, - TSKEY now) { - TSKEY rowKey = TD_ROW_KEY(row); - if (rowKey < minKey || rowKey > maxKey) { - tsdbError("vgId:%d, table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 - " maxKey %" PRId64 " row key %" PRId64, - REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey, - rowKey); - terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; - return -1; - } - - return 0; -} -#endif - static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowKey, TSKEY minKey, TSKEY maxKey, TSKEY now) { if (rowKey < minKey || rowKey > maxKey) { @@ -89,79 +72,6 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, TSKEY rowK return 0; } -#if 0 -int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { - ASSERT(pMsg != NULL); - // STsdbMeta * pMeta = pTsdb->tsdbMeta; - SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock = NULL; - SSubmitBlkIter blkIter = {0}; - STSRow *row = NULL; - STsdbKeepCfg *pCfg = &pTsdb->keepCfg; - TSKEY now = taosGetTimestamp(pCfg->precision); - TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; - TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; - - terrno = TSDB_CODE_SUCCESS; - // pMsg->length = htonl(pMsg->length); - // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; - - // pBlock->uid = htobe64(pBlock->uid); - // pBlock->suid = htobe64(pBlock->suid); - // pBlock->sversion = htonl(pBlock->sversion); - // pBlock->dataLen = htonl(pBlock->dataLen); - // pBlock->schemaLen = htonl(pBlock->schemaLen); - // pBlock->numOfRows = htonl(pBlock->numOfRows); - -#if 0 - if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { - tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - STable *pTable = pMeta->tables[pBlock->tid]; - if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { - tsdbError("vgId:%d, failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pTsdb), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tsdbError("vgId:%d, invalid action trying to insert a super table %s", REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - // Check schema version and update schema if needed - if (tsdbCheckTableSchema(pTsdb, pBlock, pTable) < 0) { - if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - continue; - } else { - return -1; - } - } -#endif - tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); - while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { - if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) { - return -1; - } - } - } - - if (terrno != TSDB_CODE_SUCCESS) return -1; - return 0; -} -#endif - int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { int32_t code = 0; STsdbKeepCfg *pCfg = &pTsdb->keepCfg; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 05d29656ef7c2676ce6c210fc3e909f6fce54c45..a34836959ca798cdc0b4686bb2988a74724b2666 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -13,7 +13,11 @@ * along with this program. If not, see . */ +#include "tencode.h" +#include "tmsg.h" #include "vnd.h" +#include "vnode.h" +#include "vnodeInt.h" static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -31,158 +35,254 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); -int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { - int32_t code = 0; +static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) { + int32_t code = 0; + int32_t lino = 0; + + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // flags + if (tDecodeI32v(pCoder, NULL) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // name + char *name = NULL; + if (tDecodeCStr(pCoder, &name) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // uid + int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); + if (uid == 0) { + uid = tGenIdPI64(); + } + *(int64_t *)(pCoder->data + pCoder->pos) = uid; + + // ctime + *(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime; + + tEndDecode(pCoder); + +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + vTrace("vgId:%d %s done, table:%s uid generated:%" PRId64, TD_VID(pVnode), __func__, name, uid); + if (pUid) *pUid = uid; + } + return code; +} +static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; + + int64_t ctime = taosGetTimestampMs(); SDecoder dc = {0}; + int32_t nReqs; - switch (pMsg->msgType) { - case TDMT_VND_CREATE_TABLE: { - int64_t ctime = taosGetTimestampMs(); - int32_t nReqs; + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + if (tStartDecode(&dc) < 0) { + code = TSDB_CODE_INVALID_MSG; + return code; + } - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } + if (tDecodeI32v(&dc, &nReqs) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + for (int32_t iReq = 0; iReq < nReqs; iReq++) { + code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + } - if (tDecodeI32v(&dc, &nReqs) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - for (int32_t iReq = 0; iReq < nReqs; iReq++) { - tb_uid_t uid = tGenIdPI64(); - char *name = NULL; - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + tEndDecode(&dc); - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; +_exit: + tDecoderClear(&dc); + return code; +} +extern int64_t tsMaxKeyByPrecision[]; +static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) { + int32_t code = 0; + int32_t lino = 0; - vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid); - tEndDecode(&dc); - } + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - tEndDecode(&dc); - tDecoderClear(&dc); - } break; - case TDMT_VND_SUBMIT: { - int64_t ctime = taosGetTimestampMs(); + SSubmitTbData submitTbData; + if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); - tStartDecode(&dc); + int64_t uid; + if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid); + TSDB_CHECK_CODE(code, lino, _exit); + } - uint64_t nSubmitTbData; - if (tDecodeU64v(&dc, &nSubmitTbData) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + // submit data + if (tDecodeI64(pCoder, &submitTbData.suid) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } - for (int32_t i = 0; i < nSubmitTbData; i++) { - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + *(int64_t *)(pCoder->data + pCoder->pos) = uid; + pCoder->pos += sizeof(int64_t); + } else { + tDecodeI64(pCoder, &submitTbData.uid); + } - int32_t flags; - if (tDecodeI32v(&dc, &flags) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } + if (tDecodeI32v(pCoder, &submitTbData.sver) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // scan and check + TSKEY now = ctime; + if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) { + now *= 1000; + } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) { + now *= 1000000; + } + TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2; + TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; + if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + uint64_t nColData; + if (tDecodeU64v(pCoder, &nColData) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } - if (flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { - // SVCreateTbReq - if (tStartDecode(&dc) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - if (tDecodeI32v(&dc, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - char *name = NULL; - if (tDecodeCStr(&dc, &name) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, name); - if (uid == 0) { - uid = tGenIdPI64(); - } - - *(int64_t *)(dc.data + dc.pos) = uid; - *(int64_t *)(dc.data + dc.pos + 8) = ctime; - - tEndDecode(&dc); - - // SSubmitTbData - int64_t suid; - if (tDecodeI64(&dc, &suid) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _err; - } - - *(int64_t *)(dc.data + dc.pos) = uid; - } + SColData colData = {0}; + pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); - tEndDecode(&dc); + for (int32_t iRow = 0; iRow < colData.nVal; iRow++) { + if (((TSKEY *)colData.pData)[iRow] < minKey || ((TSKEY *)colData.pData)[iRow] > maxKey) { + code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + goto _exit; } + } + } else { + uint64_t nRow; + if (tDecodeU64v(pCoder, &nRow) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } - tEndDecode(&dc); - tDecoderClear(&dc); - } break; - case TDMT_VND_DELETE: { - int32_t size; - int32_t ret; - uint8_t *pCont; - SEncoder *pCoder = &(SEncoder){0}; - SDeleteRes res = {0}; - SReadHandle handle = { - .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - - code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); - if (code) { - goto _err; + for (int32_t iRow = 0; iRow < nRow; ++iRow) { + SRow *pRow = (SRow *)(pCoder->data + pCoder->pos); + pCoder->pos += pRow->len; + + if (pRow->ts < minKey || pRow->ts > maxKey) { + code = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + goto _exit; } + } + } + + tEndDecode(pCoder); - // malloc and encode - tEncodeSize(tEncodeDeleteRes, &res, size, ret); - pCont = rpcMallocCont(size + sizeof(SMsgHead)); +_exit: + return code; +} +static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + int32_t lino = 0; - ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); - ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + SDecoder *pCoder = &(SDecoder){0}; - tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); - tEncodeDeleteRes(pCoder, &res); - tEncoderClear(pCoder); + tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); - rpcFreeCont(pMsg->pCont); - pMsg->pCont = pCont; - pMsg->contLen = size + sizeof(SMsgHead); + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + uint64_t nSubmitTbData; + if (tDecodeU64v(pCoder, &nSubmitTbData) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t ctime = taosGetTimestampMs(); + for (int32_t i = 0; i < nSubmitTbData; i++) { + code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tEndDecode(pCoder); + +_exit: + tDecoderClear(pCoder); + return code; +} + +static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + + int32_t size; + int32_t ret; + uint8_t *pCont; + SEncoder *pCoder = &(SEncoder){0}; + SDeleteRes res = {0}; + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; - taosArrayDestroy(res.uidList); + code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); + if (code) goto _exit; + + // malloc and encode + tEncodeSize(tEncodeDeleteRes, &res, size, ret); + pCont = rpcMallocCont(size + sizeof(SMsgHead)); + + ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); + ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + + tEncoderInit(pCoder, pCont + sizeof(SMsgHead), size); + tEncodeDeleteRes(pCoder, &res); + tEncoderClear(pCoder); + + rpcFreeCont(pMsg->pCont); + pMsg->pCont = pCont; + pMsg->contLen = size + sizeof(SMsgHead); + + taosArrayDestroy(res.uidList); + +_exit: + return code; +} + +int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = 0; + + switch (pMsg->msgType) { + case TDMT_VND_CREATE_TABLE: { + code = vnodePreProcessCreateTableMsg(pVnode, pMsg); + } break; + case TDMT_VND_SUBMIT: { + code = vnodePreProcessSubmitMsg(pVnode, pMsg); + } break; + case TDMT_VND_DELETE: { + code = vnodePreProcessDeleteMsg(pVnode, pMsg); } break; default: break; } - return code; - -_err: - vError("vgId%d, preprocess request failed since %s", TD_VID(pVnode), tstrerror(code)); +_exit: + if (code) { + vError("vgId%d failed to preprocess write request since %s, msg type:%d", TD_VID(pVnode), tstrerror(code), + pMsg->msgType); + } return code; } @@ -875,7 +975,6 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { -#if 1 int32_t code = 0; terrno = 0; @@ -896,12 +995,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } tDecoderClear(&dc); - // check - code = tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq); - if (code) { - goto _exit; - } - for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); @@ -1046,145 +1139,6 @@ _exit: if (code) terrno = code; return code; - -#else - SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - SSubmitRsp submitRsp = {0}; - int32_t nRows = 0; - int32_t tsize, ret; - SEncoder encoder = {0}; - SArray *newTbUids = NULL; - SVStatis statis = {0}; - bool tbCreated = false; - terrno = TSDB_CODE_SUCCESS; - - pRsp->code = 0; - pSubmitReq->version = version; - statis.nBatchInsert = 1; - - if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) { - pRsp->code = terrno; - goto _exit; - } - - submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp)); - newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t)); - if (!submitRsp.pArray || !newTbUids) { - pRsp->code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - for (;;) { - tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; - - SSubmitBlkRsp submitBlkRsp = {0}; - tbCreated = false; - - // create table for auto create table mode - if (msgIter.schemaLen > 0) { - // tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen); - // if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) { - // pRsp->code = TSDB_CODE_INVALID_MSG; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - // if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) { - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - // if ((terrno = grantCheck(TSDB_GRANT_TABLE)) < 0) { - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - - if (metaCreateTable(pVnode->pMeta, version, &createTbReq, &submitBlkRsp.pMeta) < 0) { - // if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - // submitBlkRsp.code = terrno; - // pRsp->code = terrno; - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - // goto _exit; - // } - } else { - if (NULL != submitBlkRsp.pMeta) { - vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); - } - - // taosArrayPush(newTbUids, &createTbReq.uid); - - submitBlkRsp.uid = createTbReq.uid; - submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); - sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); - tbCreated = true; - } - - // msgIter.uid = createTbReq.uid; - // if (createTbReq.type == TSDB_CHILD_TABLE) { - // msgIter.suid = createTbReq.ctb.suid; - // } else { - // msgIter.suid = 0; - // } - - // tDecoderClear(&decoder); - // taosArrayDestroy(createTbReq.ctb.tagName); - } - - if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) { - submitBlkRsp.code = terrno; - } - - submitRsp.numOfRows += submitBlkRsp.numOfRows; - submitRsp.affectedRows += submitBlkRsp.affectedRows; - if (tbCreated || submitBlkRsp.code) { - taosArrayPush(submitRsp.pArray, &submitBlkRsp); - } - } - - // if (taosArrayGetSize(newTbUids) > 0) { - // vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), - // (int32_t)taosArrayGetSize(newTbUids)); - // } - - // tqUpdateTbUidList(pVnode->pTq, newTbUids, true); - -_exit: - taosArrayDestroy(newTbUids); - // tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret); - // pRsp->pCont = rpcMallocCont(tsize); - // pRsp->contLen = tsize; - // tEncoderInit(&encoder, pRsp->pCont, tsize); - // tEncodeSSubmitRsp(&encoder, &submitRsp); - // tEncoderClear(&encoder); - - taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); - - // TODO: the partial success scenario and the error case - // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level - // 1/level 2. - // TODO: refactor - if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { - statis.nBatchInsertSuccess = 1; - tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); - } - - // N.B. not strict as the following procedure is not atomic - atomic_add_fetch_64(&pVnode->statis.nInsert, submitRsp.numOfRows); - atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); - atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert); - atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess); - - vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); - return 0; -#endif - return 0; } static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {