提交 38e103db 编写于 作者: H Hongze Cheng

refact meta 6

上级 e5e7e713
......@@ -1482,15 +1482,14 @@ typedef struct SVCreateStbReq {
const char* name;
tb_uid_t suid;
int8_t rollup;
int32_t ttl;
int16_t nCols;
int16_t sver;
SSchema* pSchema;
int16_t nTags;
SSchema* pSchemaTg;
SRSmaParam pRSmaParam;
} SVCreateStbReq;
int tEnSizeSVCreateStbReq(const SVCreateStbReq *pReq, int32_t *size);
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
......
......@@ -91,6 +91,20 @@ typedef struct {
ptr; \
})
#define tEncodeSize(E, S, SIZE) \
({ \
SCoder coder = {0}; \
int ret = 0; \
tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \
if ((E)(&coder, S) == 0) { \
SIZE = coder.pos; \
} else { \
ret = -1; \
} \
tCoderClear(&coder); \
ret; \
})
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type);
void tCoderClear(SCoder* pCoder);
......
......@@ -3691,30 +3691,14 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->ast);
}
int tEnSizeSVCreateStbReq(const SVCreateStbReq *pReq, int32_t *size) {
SCoder coder = {0};
tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
if (tEncodeSVCreateStbReq(&coder, pReq) < 0) {
tCoderClear(&coder);
return -1;
}
*size = coder.pos;
tCoderClear(&coder);
return 0;
}
int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->rollup) < 0) return -1;
if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1;
if (tEncodeI16v(pCoder, pReq->nCols) < 0) return -1;
if (tEncodeI16v(pCoder, pReq->sver) < 0) return -1;
for (int iCol = 0; iCol < pReq->nCols; iCol++) {
if (tEncodeSSchema(pCoder, pReq->pSchema + iCol) < 0) return -1;
}
......@@ -3736,9 +3720,9 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->rollup) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1;
if (tDecodeI16v(pCoder, &pReq->nCols) < 0) return -1;
if (tDecodeI16v(pCoder, &pReq->sver) < 0) return -1;
pReq->pSchema = (SSchema *)TCODER_MALLOC(pCoder, sizeof(SSchema) * pReq->nCols);
if (pReq->pSchema == NULL) return -1;
for (int iCol = 0; iCol < pReq->nCols; iCol++) {
......
......@@ -406,8 +406,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.name = (char *)tNameGetTableName(&name);
req.suid = pStb->uid;
req.rollup = pStb->aggregationMethod > -1 ? 1 : 0;
req.ttl = 0;
req.nCols = pStb->numOfColumns;
req.sver = 0; // TODO
req.pSchema = pStb->pColumns;
req.nTags = pStb->numOfTags;
req.pSchemaTg = pStb->pTags;
......@@ -432,7 +432,7 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
}
// get length
if (tEnSizeSVCreateStbReq(&req, &contLen) < 0) {
if (tEncodeSize(tEncodeSVCreateStbReq, &req, contLen) < 0) {
taosMemoryFree(req.pRSmaParam.pFuncIds);
return NULL;
}
......
......@@ -23,6 +23,7 @@ target_sources(
"src/meta/metaTDBImpl.c"
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
# tsdb
"src/tsdb/tsdbTDBImpl.c"
......
......@@ -39,6 +39,12 @@ typedef struct SMSmaCursor SMSmaCursor;
int metaOpen(SVnode* pVnode, SMeta** ppMeta);
int metaClose(SMeta* pMeta);
// metaEntry ==================
typedef struct SMetaEntry SMetaEntry;
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
// metaIdx ==================
int metaOpenIdx(SMeta* pMeta);
void metaCloseIdx(SMeta* pMeta);
......@@ -117,6 +123,34 @@ SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
struct SMetaEntry {
int8_t type;
tb_uid_t uid;
const char* name;
union {
struct {
int16_t nCols;
int16_t sver;
SSchema* pSchema;
int16_t nTags;
SSchema* pSchemaTg;
} stbEntry;
struct {
int64_t ctime;
int32_t ttlDays;
tb_uid_t suid;
SKVRow pTags;
} ctbEntry;
struct {
int64_t ctime;
int32_t ttlDays;
int16_t nCols;
int16_t sver;
SSchema* pSchema;
} ntbEntry;
};
};
#ifndef META_REFACT
// SMetaDB
int metaOpenDB(SMeta* pMeta);
......
......@@ -26,6 +26,7 @@
#include "tcompression.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tencode.h"
#include "tfs.h"
#include "tglobal.h"
#include "tjson.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI8(pCoder, pME->type) < 0) return -1;
if (tEncodeI64(pCoder, pME->uid) < 0) return -1;
if (tEncodeCStr(pCoder, pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) {
if (tEncodeI16v(pCoder, pME->stbEntry.nCols) < 0) return -1;
if (tEncodeI16v(pCoder, pME->stbEntry.sver) < 0) return -1;
for (int iCol = 0; iCol < pME->stbEntry.nCols; iCol++) {
if (tEncodeSSchema(pCoder, pME->stbEntry.pSchema + iCol) < 0) return -1;
}
if (tEncodeI16v(pCoder, pME->stbEntry.nTags) < 0) return -1;
for (int iTag = 0; iTag < pME->stbEntry.nTags; iTag++) {
if (tEncodeSSchema(pCoder, pME->stbEntry.pSchemaTg + iTag) < 0) return -1;
}
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pME->ctbEntry.pTags, kvRowLen(pME->ctbEntry.pTags)) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeI16v(pCoder, pME->ntbEntry.nCols) < 0) return -1;
if (tEncodeI16v(pCoder, pME->ntbEntry.sver) < 0) return -1;
for (int iCol = 0; iCol < pME->ntbEntry.nCols; iCol++) {
if (tEncodeSSchema(pCoder, pME->ntbEntry.pSchema + iCol) < 0) return -1;
}
} else {
ASSERT(0);
}
tEndEncode(pCoder);
return 0;
}
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI8(pCoder, &pME->type) < 0) return -1;
if (tDecodeI64(pCoder, &pME->uid) < 0) return -1;
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeI16v(pCoder, &pME->stbEntry.nCols) < 0) return -1;
if (tDecodeI16v(pCoder, &pME->stbEntry.sver) < 0) return -1;
pME->stbEntry.pSchema = (SSchema *)TCODER_MALLOC(pCoder, sizeof(SSchema) * pME->stbEntry.nCols);
if (pME->stbEntry.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int iCol = 0; iCol < pME->stbEntry.nCols; iCol++) {
if (tDecodeSSchema(pCoder, pME->stbEntry.pSchema + iCol) < 0) return -1;
}
if (tDecodeI16v(pCoder, &pME->stbEntry.nTags) < 0) return -1;
pME->stbEntry.pSchemaTg = (SSchema *)TCODER_MALLOC(pCoder, sizeof(SSchema) * pME->stbEntry.nTags);
if (pME->stbEntry.pSchemaTg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int iTag = 0; iTag < pME->stbEntry.nTags; iTag++) {
if (tDecodeSSchema(pCoder, pME->stbEntry.pSchemaTg + iTag) < 0) return -1;
}
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeBinary(pCoder, pME->ctbEntry.pTags, NULL) < 0) return -1; // (TODO)
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI16v(pCoder, &pME->ntbEntry.nCols) < 0) return -1;
if (tDecodeI16v(pCoder, &pME->ntbEntry.sver) < 0) return -1;
pME->ntbEntry.pSchema = (SSchema *)TCODER_MALLOC(pCoder, sizeof(SSchema) * pME->ntbEntry.nCols);
if (pME->ntbEntry.pSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int iCol = 0; iCol < pME->ntbEntry.nCols; iCol++) {
if (tEncodeSSchema(pCoder, pME->ntbEntry.pSchema + iCol) < 0) return -1;
}
} else {
ASSERT(0);
}
tEndDecode(pCoder);
return 0;
}
......@@ -16,28 +16,80 @@
#include "vnodeInt.h"
int metaCreateSTable(SMeta *pMeta, SVCreateStbReq *pReq, SVCreateStbRsp *pRsp) {
// TODO
STbDbKey tbDbKey = {0};
SSkmDbKey skmDbKey = {0};
SMetaEntry me = {0};
int kLen;
int vLen;
const void *pKey;
const void *pVal;
// check name and uid unique
// set structs
me.type = TSDB_SUPER_TABLE;
me.uid = pReq->suid;
me.name = pReq->name;
me.stbEntry.nCols = pReq->nCols;
me.stbEntry.sver = pReq->sver;
me.stbEntry.pSchema = pReq->pSchema;
me.stbEntry.nTags = pReq->nTags;
me.stbEntry.pSchemaTg = pReq->pSchemaTg;
tbDbKey.uid = pReq->suid;
tbDbKey.ver = 0; // (TODO)
skmDbKey.uid = pReq->suid;
skmDbKey.sver = 0; // (TODO)
// save to table.db (TODO)
pKey = &tbDbKey;
kLen = sizeof(tbDbKey);
pVal = NULL;
vLen = 0;
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, NULL) < 0) {
return -1;
}
// save to schema.db
pKey = &skmDbKey;
kLen = sizeof(skmDbKey);
pVal = NULL;
vLen = 0;
if (tdbDbInsert(pMeta->pSkmDb, pKey, kLen, pVal, vLen, NULL) < 0) {
return -1;
}
// update name.idx
pKey = pReq->name;
kLen = strlen(pReq->name) + 1;
pVal = &pReq->suid;
vLen = sizeof(tb_uid_t);
if (tdbDbInsert(pMeta->pNameIdx, pKey, kLen, pVal, vLen, NULL) < 0) {
return -1;
}
return 0;
}
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) {
#ifdef META_REFACT
#else
#if 0
if (metaSaveTableToDB(pMeta, pTbCfg) < 0) {
// TODO: handle error
return -1;
}
#endif
if (metaSaveTableToIdx(pMeta, pTbCfg) < 0) {
// TODO: handle error
return -1;
}
#endif
return 0;
}
int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
#if 0
if (metaRemoveTableFromIdx(pMeta, uid) < 0) {
// TODO: handle error
return -1;
......@@ -47,6 +99,7 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid) {
// TODO
return -1;
}
#endif
return 0;
}
......@@ -15,9 +15,12 @@
#include "vnodeInt.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len);
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
......@@ -50,6 +53,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
vTrace("vgId: %d start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version);
pVnode->state.applied = version;
// skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
......@@ -63,21 +68,22 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
switch (pMsg->msgType) {
/* META */
case TDMT_VND_CREATE_STB:
ret = vnodeProcessCreateStbReq(pVnode, pReq, len);
if (vnodeProcessCreateStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_ALTER_STB:
vnodeProcessAlterStbReq(pVnode, pReq);
if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DROP_STB:
vTrace("vgId:%d, process drop stb req", TD_VID(pVnode));
if (vnodeProcessDropStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_CREATE_TABLE:
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
vnodeProcessCreateTbReq(pVnode, pMsg, pReq, pRsp);
if (vnodeProcessCreateTbReq(pVnode, pMsg, pReq, pRsp) < 0) goto _err;
break;
case TDMT_VND_ALTER_TABLE:
if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DROP_TABLE:
if (vnodeProcessDropTbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
......@@ -128,7 +134,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
break;
}
pVnode->state.applied = version;
vDebug("vgId: %d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
// Check if it needs to commit
if (vnodeShouldCommit(pVnode)) {
......@@ -139,6 +145,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
}
return 0;
_err:
vDebug("vgId: %d process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(terrno), version);
return -1;
}
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
......@@ -203,30 +214,45 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return 0;
}
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len) {
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq, int len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SCoder coder;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
// decode and process req
tCoderInit(&coder, TD_LITTLE_ENDIAN, pReq, len, TD_DECODER);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
tCoderClear(&coder);
return -1;
pRsp->code = terrno;
goto _err;
}
if (metaCreateSTable(pVnode->pMeta, pReq, NULL) < 0) {
tCoderClear(&coder);
return -1;
pRsp->code = terrno;
goto _err;
}
tCoderClear(&coder);
return 0;
_err:
tCoderClear(&coder);
return -1;
}
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) {
SVCreateTbBatchReq vCreateTbBatchReq = {0};
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray);
for (int i = 0; i < reqNum; i++) {
......@@ -279,7 +305,9 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
return 0;
}
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
ASSERT(0);
#if 0
SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", TD_VID(pVnode));
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
......@@ -291,6 +319,25 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
// taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
// }
taosMemoryFree(vAlterTbReq.name);
#endif
return 0;
}
static int vnodeProcessDropStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
// TODO
ASSERT(0);
return 0;
}
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
// TODO
ASSERT(0);
return 0;
}
static int vnodeProcessDropTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
// TODO
ASSERT(0);
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册