From 0c1cde9cfc3e884a32c65b80f2cfa3b71b52f58c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 22 Apr 2022 09:42:31 +0000 Subject: [PATCH] refact meta 8 --- source/common/src/tmsg.c | 6 +- source/dnode/vnode/src/inc/meta.h | 10 +-- source/dnode/vnode/src/meta/metaEntry.c | 2 +- source/dnode/vnode/src/meta/metaTable.c | 98 ++++++++++++++++++++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 78 ++++++-------------- 5 files changed, 122 insertions(+), 72 deletions(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4d16524294..175829b83d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3651,10 +3651,12 @@ int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) { } int tEncodeSVCreateTbBatchReq(SCoder *pCoder, const SVCreateTbBatchReq *pReq) { + int32_t nReq = taosArrayGetSize(pReq->pArray); + if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI32v(pCoder, taosArrayGetSize(pReq->pArray)) < 0) return -1; - for (int iReq = 0; iReq < pReq->nReqs; iReq++) { + if (tEncodeI32v(pCoder, nReq) < 0) return -1; + for (int iReq = 0; iReq < nReq; iReq++) { if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1; } diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index dfc81501fb..92c89bf1f6 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -48,6 +48,7 @@ int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME); // metaTable ================== int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); +int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); // metaQuery ================== typedef struct SMetaEntryReader SMetaEntryReader; @@ -109,7 +110,6 @@ typedef struct { #define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE -int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); int metaDropTable(SMeta* pMeta, tb_uid_t uid); int metaCommit(SMeta* pMeta); int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); @@ -142,10 +142,10 @@ struct SMetaEntry { SSchema* pSchemaTg; } stbEntry; struct { - int64_t ctime; - int32_t ttlDays; - tb_uid_t suid; - SKVRow pTags; + int64_t ctime; + int32_t ttlDays; + tb_uid_t suid; + const void* pTags; } ctbEntry; struct { int64_t ctime; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index e85c562c27..99fb601dfc 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -86,7 +86,7 @@ int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) { if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; - if (tDecodeBinary(pCoder, pME->ctbEntry.pTags, NULL) < 0) return -1; // (TODO) + if (tDecodeBinary(pCoder, &pME->ctbEntry.pTags, NULL) < 0) return -1; // (TODO) } else if (pME->type == TSDB_NORMAL_TABLE) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index e3ef345fd6..d4f1ac3e99 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -18,6 +18,9 @@ static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version); static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid); +static int metaCreateNormalTable(SMeta *pMeta, int64_t version, SMetaEntry *pME); +static int metaCreateChildTable(SMeta *pMeta, int64_t version, SMetaEntry *pME); +static int metaUpdateTtlIdx(SMeta *pMeta, int64_t dtime, tb_uid_t uid); int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { SSkmDbKey skmDbKey = {0}; @@ -51,6 +54,8 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { // save to table.db if (metaSaveToTbDb(pMeta, version, &me) < 0) goto _err; + // save to schema.db (TODO) + // update uid idx if (metaUpdateUidIdx(pMeta, me.uid, version) < 0) goto _err; @@ -72,20 +77,59 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { return 0; } -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { -#if 0 - if (metaSaveTableToDB(pMeta, pTbCfg) < 0) { - // TODO: handle error - return -1; +int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { + SMetaEntry me = {0}; + + // validate message + if (pReq->type != TSDB_CHILD_TABLE && pReq->type != TSDB_NORMAL_TABLE) { + terrno = TSDB_CODE_INVALID_MSG; + goto _err; } - if (metaSaveTableToIdx(pMeta, pTbCfg) < 0) { - // TODO: handle error - return -1; + // preprocess req + pReq->uid = tGenIdPI64(); + pReq->ctime = taosGetTimestampSec(); + + { + // TODO: validate request (uid and name unique) + // for child table, also check if super table exists + } + + // build SMetaEntry + me.type = pReq->type; + me.uid = pReq->uid; + me.name = pReq->name; + if (me.type == TSDB_CHILD_TABLE) { + me.ctbEntry.ctime = pReq->ctime; + me.ctbEntry.ttlDays = pReq->ttl; + me.ctbEntry.suid = pReq->ctb.suid; + me.ctbEntry.pTags = pReq->ctb.pTag; + } else { + me.ntbEntry.ctime = pReq->ctime; + me.ntbEntry.ttlDays = pReq->ttl; + me.ntbEntry.nCols = pReq->ntb.nCols; + me.ntbEntry.sver = pReq->ntb.sver; + me.ntbEntry.pSchema = pReq->ntb.pSchema; } -#endif + // save table + if (me.type == TSDB_CHILD_TABLE) { + if (metaCreateChildTable(pMeta, version, &me) < 0) { + goto _err; + } + } else { + if (metaCreateNormalTable(pMeta, version, &me) < 0) { + goto _err; + } + } + + metaDebug("vgId:%d table %s uid %" PRId64 " is created", TD_VID(pMeta->pVnode), pReq->name, pReq->uid); return 0; + +_err: + metaError("vgId:%d failed to create table:%s type:%s since %s", TD_VID(pMeta->pVnode), pReq->name, + pReq->type == TSDB_CHILD_TABLE ? "child table" : "normal table", tstrerror(terrno)); + return -1; } int metaDropTable(SMeta *pMeta, tb_uid_t uid) { @@ -152,4 +196,38 @@ static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version) { static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid) { return tdbDbInsert(pMeta->pNameIdx, name, strlen(name) + 1, &uid, sizeof(uid), NULL); -} \ No newline at end of file +} + +static int metaUpdateTtlIdx(SMeta *pMeta, int64_t dtime, tb_uid_t uid) { + STtlIdxKey ttlKey = {.dtime = dtime, .uid = uid}; + return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, NULL); +} + +static int metaCreateChildTable(SMeta *pMeta, int64_t version, SMetaEntry *pME) { + // TODO + return 0; +} + +static int metaCreateNormalTable(SMeta *pMeta, int64_t version, SMetaEntry *pME) { + int64_t dtime; + + // save to table.db + if (metaSaveToTbDb(pMeta, version, pME) < 0) return -1; + + // save to schema.db + + // update uid.idx + if (metaUpdateUidIdx(pMeta, pME->uid, version) < 0) return -1; + + // save to name.idx + if (metaUpdateNameIdx(pMeta, pME->name, pME->uid) < 0) return -1; + + // save to pTtlIdx if need + if (pME->ntbEntry.ttlDays > 0) { + dtime = pME->ntbEntry.ctime + pME->ntbEntry.ttlDays * 24 * 60; + + if (metaUpdateTtlIdx(pMeta, dtime, pME->uid) < 0) return -1; + } + + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 21bb6dc44c..ee8e7161ec 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -18,7 +18,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); -static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp); +static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); @@ -77,7 +77,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessDropStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_CREATE_TABLE: - if (vnodeProcessCreateTbReq(pVnode, pMsg, pReq, pRsp) < 0) goto _err; + if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_TABLE: if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; @@ -244,67 +244,37 @@ _err: return -1; } -static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) { -#if 0 - SVCreateTbBatchReq vCreateTbBatchReq = {0}; - SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; +static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { + SCoder coder = {0}; + int rcode = 0; + SVCreateTbBatchReq req = {0}; + SVCreateTbReq *pCreateReq; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; - pRsp->code = TSDB_CODE_SUCCESS; - pRsp->pCont = NULL; - pRsp->contLen = 0; - - tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); - int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray); - for (int i = 0; i < reqNum; i++) { - SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); - char tableFName[TSDB_TABLE_FNAME_LEN]; - SMsgHead *pHead = (SMsgHead *)pMsg->pCont; - sprintf(tableFName, "%s.%s", pVnode->config.dbname, pCreateTbReq->name); - - int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); - if (code) { - SVCreateTbRsp rsp; - rsp.code = code; - - taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); - } + // decode + tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER); + if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { + rcode = -1; + terrno = TSDB_CODE_INVALID_MSG; + goto _exit; + } - if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { - // TODO: handle error - vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); - } - // TODO: to encapsule a free API - taosMemoryFree(pCreateTbReq->name); - if (pCreateTbReq->type == TD_SUPER_TABLE) { - // taosMemoryFree(pCreateTbReq->stbCfg.pSchema); - // taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema); - // if (pCreateTbReq->stbCfg.pRSmaParam) { - // taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds); - // taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam); - // } - } else if (pCreateTbReq->type == TD_CHILD_TABLE) { - taosMemoryFree(pCreateTbReq->ctbCfg.pTag); + // loop to create table + for (int iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) { + // TODO: fill request } else { - taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); + // TODO } } - vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); - taosArrayDestroy(vCreateTbBatchReq.pArray); - if (vCreateTbBatchRsp.rspList) { - int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); - void *msg = rpcMallocCont(contLen); - tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); - taosArrayDestroy(vCreateTbBatchRsp.rspList); - - pRsp->pCont = msg; - pRsp->contLen = contLen; - } + // prepare rsp -#endif - return 0; +_exit: + tCoderClear(&coder); + return rcode; } static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { -- GitLab