提交 65edd329 编写于 作者: H Hongze Cheng

feat: alter super table

上级 9fa5b304
......@@ -383,9 +383,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.suid = pStb->uid;
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
req.schema.nCols = pStb->numOfColumns;
req.schema.sver = 0;
req.schema.sver = pStb->version;
req.schema.pSchema = pStb->pColumns;
req.schemaTag.nCols = pStb->numOfTags;
req.schemaTag.nCols = 0;
req.schemaTag.pSchema = pStb->pTags;
if (req.rollup) {
......
......@@ -77,6 +77,7 @@ int metaClose(SMeta* pMeta);
int metaBegin(SMeta* pMeta);
int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
......
......@@ -131,6 +131,75 @@ _err:
return -1;
}
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TDBC *pUidIdxc = NULL;
TDBC *pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
SDecoder dc = {0};
int32_t ret;
int32_t c;
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c) {
ASSERT(0);
return -1;
}
ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
if (ret < 0) {
ASSERT(0);
return -1;
}
oversion = *(int64_t *)pData;
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
ret = tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(ret == 0 && c == 0);
ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ASSERT(ret == 0);
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &oStbEntry);
nStbEntry.version = version;
nStbEntry.type = TSDB_SUPER_TABLE;
nStbEntry.uid = pReq->suid;
nStbEntry.name = pReq->name;
nStbEntry.stbEntry.schema = pReq->schema;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
metaWLock(pMeta);
// compare two entry
if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) {
if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) {
metaSaveToSkmDb(pMeta, &nStbEntry);
}
}
// if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) {
// // change tag schema
// }
// update table.db
metaSaveToTbDb(pMeta, &nStbEntry);
// update uid index
tdbDbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaULock(pMeta);
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
return 0;
}
int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
SMetaEntry me = {0};
SMetaReader mr = {0};
......
......@@ -91,7 +91,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
// TODO set to real sversion
*pUid = 0;
int32_t sversion = 0;
int32_t sversion = 1;
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
......
......@@ -465,7 +465,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
pTbData = (STbData *)pNode->pData;
pCommitIter = pCommith->iters + i;
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); // TODO: schema version
pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 1); // TODO: schema version
if (pTSchema) {
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
......@@ -912,7 +912,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
while (bidx < nBlocks) {
if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) {
// Set commit table
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version
pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version
if (!pTSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......
......@@ -490,7 +490,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0);
pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 1);
int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn);
int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData;
......@@ -1618,7 +1618,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
if (pSchema1 == NULL) {
// pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
// TODO: use the real schemaVersion
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0);
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1);
}
#ifdef TD_DEBUG_PRINT_ROW
......@@ -1637,7 +1637,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
if (pSchema2 == NULL) {
// pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
// TODO: use the real schemaVersion
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0);
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1);
}
if (isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
......@@ -2755,7 +2755,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
win->ekey = key;
if (rv != TD_ROW_SVER(row)) {
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 1);
rv = TD_ROW_SVER(row);
}
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
......@@ -3889,7 +3889,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
// NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true);
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 1, true);
// no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
......
......@@ -2084,7 +2084,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;
......
......@@ -16,7 +16,7 @@
#include "vnd.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
......@@ -72,7 +72,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_ALTER_STB:
if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err;
if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DROP_STB:
if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
......@@ -398,20 +398,32 @@ _exit:
return rcode;
}
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
// ASSERT(0);
#if 0
SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", TD_VID(pVnode));
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
// TODO: to encapsule a free API
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema);
if (vAlterTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam);
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder dc = {0};
pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
tDecoderInit(&dc, pReq, len);
// decode req
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
}
taosMemoryFree(vAlterTbReq.name);
#endif
if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno;
tDecoderClear(&dc);
return -1;
}
tDecoderClear(&dc);
return 0;
}
......@@ -514,7 +526,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
if (pSchema) {
taosMemoryFreeClear(pSchema);
}
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1); // TODO: use the real schema
if (pSchema) {
suid = msgIter->suid;
}
......
......@@ -3688,7 +3688,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
req.type = TD_NORMAL_TABLE;
req.name = strdup(pStmt->tableName);
req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols);
req.ntb.schema.sver = 0;
req.ntb.schema.sver = 1;
req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema));
if (NULL == req.name || NULL == req.ntb.schema.pSchema) {
destroyCreateTbReq(&req);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册