提交 0c1cde9c 编写于 作者: H Hongze Cheng

refact meta 8

上级 a769c57f
...@@ -3651,10 +3651,12 @@ int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) { ...@@ -3651,10 +3651,12 @@ int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
} }
int tEncodeSVCreateTbBatchReq(SCoder *pCoder, const SVCreateTbBatchReq *pReq) { int tEncodeSVCreateTbBatchReq(SCoder *pCoder, const SVCreateTbBatchReq *pReq) {
int32_t nReq = taosArrayGetSize(pReq->pArray);
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, taosArrayGetSize(pReq->pArray)) < 0) return -1; if (tEncodeI32v(pCoder, nReq) < 0) return -1;
for (int iReq = 0; iReq < pReq->nReqs; iReq++) { for (int iReq = 0; iReq < nReq; iReq++) {
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1; if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
} }
......
...@@ -48,6 +48,7 @@ int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME); ...@@ -48,6 +48,7 @@ int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
// metaTable ================== // metaTable ==================
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
// metaQuery ================== // metaQuery ==================
typedef struct SMetaEntryReader SMetaEntryReader; typedef struct SMetaEntryReader SMetaEntryReader;
...@@ -109,7 +110,6 @@ typedef struct { ...@@ -109,7 +110,6 @@ typedef struct {
#define META_CHILD_TABLE TD_CHILD_TABLE #define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
int metaDropTable(SMeta* pMeta, tb_uid_t uid); int metaDropTable(SMeta* pMeta, tb_uid_t uid);
int metaCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
...@@ -142,10 +142,10 @@ struct SMetaEntry { ...@@ -142,10 +142,10 @@ struct SMetaEntry {
SSchema* pSchemaTg; SSchema* pSchemaTg;
} stbEntry; } stbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
tb_uid_t suid; tb_uid_t suid;
SKVRow pTags; const void* pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
......
...@@ -86,7 +86,7 @@ int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) { ...@@ -86,7 +86,7 @@ int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeBinary(pCoder, pME->ctbEntry.pTags, NULL) < 0) return -1; // (TODO) if (tDecodeBinary(pCoder, &pME->ctbEntry.pTags, NULL) < 0) return -1; // (TODO)
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
......
...@@ -18,6 +18,9 @@ ...@@ -18,6 +18,9 @@
static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, int64_t version, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version); static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version);
static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid); static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid);
static int metaCreateNormalTable(SMeta *pMeta, int64_t version, SMetaEntry *pME);
static int metaCreateChildTable(SMeta *pMeta, int64_t version, SMetaEntry *pME);
static int metaUpdateTtlIdx(SMeta *pMeta, int64_t dtime, tb_uid_t uid);
int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SSkmDbKey skmDbKey = {0}; SSkmDbKey skmDbKey = {0};
...@@ -51,6 +54,8 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -51,6 +54,8 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// save to table.db // save to table.db
if (metaSaveToTbDb(pMeta, version, &me) < 0) goto _err; if (metaSaveToTbDb(pMeta, version, &me) < 0) goto _err;
// save to schema.db (TODO)
// update uid idx // update uid idx
if (metaUpdateUidIdx(pMeta, me.uid, version) < 0) goto _err; if (metaUpdateUidIdx(pMeta, me.uid, version) < 0) goto _err;
...@@ -72,20 +77,59 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { ...@@ -72,20 +77,59 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
return 0; return 0;
} }
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
#if 0 SMetaEntry me = {0};
if (metaSaveTableToDB(pMeta, pTbCfg) < 0) {
// TODO: handle error // validate message
return -1; if (pReq->type != TSDB_CHILD_TABLE && pReq->type != TSDB_NORMAL_TABLE) {
terrno = TSDB_CODE_INVALID_MSG;
goto _err;
} }
if (metaSaveTableToIdx(pMeta, pTbCfg) < 0) { // preprocess req
// TODO: handle error pReq->uid = tGenIdPI64();
return -1; pReq->ctime = taosGetTimestampSec();
{
// TODO: validate request (uid and name unique)
// for child table, also check if super table exists
}
// build SMetaEntry
me.type = pReq->type;
me.uid = pReq->uid;
me.name = pReq->name;
if (me.type == TSDB_CHILD_TABLE) {
me.ctbEntry.ctime = pReq->ctime;
me.ctbEntry.ttlDays = pReq->ttl;
me.ctbEntry.suid = pReq->ctb.suid;
me.ctbEntry.pTags = pReq->ctb.pTag;
} else {
me.ntbEntry.ctime = pReq->ctime;
me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.nCols = pReq->ntb.nCols;
me.ntbEntry.sver = pReq->ntb.sver;
me.ntbEntry.pSchema = pReq->ntb.pSchema;
} }
#endif
// save table
if (me.type == TSDB_CHILD_TABLE) {
if (metaCreateChildTable(pMeta, version, &me) < 0) {
goto _err;
}
} else {
if (metaCreateNormalTable(pMeta, version, &me) < 0) {
goto _err;
}
}
metaDebug("vgId:%d table %s uid %" PRId64 " is created", TD_VID(pMeta->pVnode), pReq->name, pReq->uid);
return 0; return 0;
_err:
metaError("vgId:%d failed to create table:%s type:%s since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->type == TSDB_CHILD_TABLE ? "child table" : "normal table", tstrerror(terrno));
return -1;
} }
int metaDropTable(SMeta *pMeta, tb_uid_t uid) { int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
...@@ -152,4 +196,38 @@ static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version) { ...@@ -152,4 +196,38 @@ static int metaUpdateUidIdx(SMeta *pMeta, tb_uid_t uid, int64_t version) {
static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid) { static int metaUpdateNameIdx(SMeta *pMeta, const char *name, tb_uid_t uid) {
return tdbDbInsert(pMeta->pNameIdx, name, strlen(name) + 1, &uid, sizeof(uid), NULL); return tdbDbInsert(pMeta->pNameIdx, name, strlen(name) + 1, &uid, sizeof(uid), NULL);
} }
\ No newline at end of file
static int metaUpdateTtlIdx(SMeta *pMeta, int64_t dtime, tb_uid_t uid) {
STtlIdxKey ttlKey = {.dtime = dtime, .uid = uid};
return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, NULL);
}
static int metaCreateChildTable(SMeta *pMeta, int64_t version, SMetaEntry *pME) {
// TODO
return 0;
}
static int metaCreateNormalTable(SMeta *pMeta, int64_t version, SMetaEntry *pME) {
int64_t dtime;
// save to table.db
if (metaSaveToTbDb(pMeta, version, pME) < 0) return -1;
// save to schema.db
// update uid.idx
if (metaUpdateUidIdx(pMeta, pME->uid, version) < 0) return -1;
// save to name.idx
if (metaUpdateNameIdx(pMeta, pME->name, pME->uid) < 0) return -1;
// save to pTtlIdx if need
if (pME->ntbEntry.ttlDays > 0) {
dtime = pME->ntbEntry.ctime + pME->ntbEntry.ttlDays * 24 * 60;
if (metaUpdateTtlIdx(pMeta, dtime, pME->uid) < 0) return -1;
}
return 0;
}
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
...@@ -77,7 +77,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -77,7 +77,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (vnodeProcessDropStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessDropStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_CREATE_TABLE: case TDMT_VND_CREATE_TABLE:
if (vnodeProcessCreateTbReq(pVnode, pMsg, pReq, pRsp) < 0) goto _err; if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_ALTER_TABLE: case TDMT_VND_ALTER_TABLE:
if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
...@@ -244,67 +244,37 @@ _err: ...@@ -244,67 +244,37 @@ _err:
return -1; return -1;
} }
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) { static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
#if 0 SCoder coder = {0};
SVCreateTbBatchReq vCreateTbBatchReq = {0}; int rcode = 0;
SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray);
for (int i = 0; i < reqNum; i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
char tableFName[TSDB_TABLE_FNAME_LEN]; // decode
SMsgHead *pHead = (SMsgHead *)pMsg->pCont; tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER);
sprintf(tableFName, "%s.%s", pVnode->config.dbname, pCreateTbReq->name); if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
rcode = -1;
int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); terrno = TSDB_CODE_INVALID_MSG;
if (code) { goto _exit;
SVCreateTbRsp rsp; }
rsp.code = code;
taosArrayPush(vCreateTbBatchRsp.rspList, &rsp);
}
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // loop to create table
// TODO: handle error for (int iReq = 0; iReq < req.nReqs; iReq++) {
vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); pCreateReq = req.pReqs + iReq;
} if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
// TODO: to encapsule a free API // TODO: fill request
taosMemoryFree(pCreateTbReq->name);
if (pCreateTbReq->type == TD_SUPER_TABLE) {
// taosMemoryFree(pCreateTbReq->stbCfg.pSchema);
// taosMemoryFree(pCreateTbReq->stbCfg.pTagSchema);
// if (pCreateTbReq->stbCfg.pRSmaParam) {
// taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam->pFuncIds);
// taosMemoryFree(pCreateTbReq->stbCfg.pRSmaParam);
// }
} else if (pCreateTbReq->type == TD_CHILD_TABLE) {
taosMemoryFree(pCreateTbReq->ctbCfg.pTag);
} else { } else {
taosMemoryFree(pCreateTbReq->ntbCfg.pSchema); // TODO
} }
} }
vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); // prepare rsp
taosArrayDestroy(vCreateTbBatchReq.pArray);
if (vCreateTbBatchRsp.rspList) {
int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp);
void *msg = rpcMallocCont(contLen);
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
taosArrayDestroy(vCreateTbBatchRsp.rspList);
pRsp->pCont = msg;
pRsp->contLen = contLen;
}
#endif _exit:
return 0; tCoderClear(&coder);
return rcode;
} }
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册