提交 381ff01f 编写于 作者: S Shengliang Guan

feat: add vgroup for tsma

上级 f93c4d08
......@@ -1022,6 +1022,10 @@ typedef struct {
SReplica replicas[TSDB_MAX_REPLICA];
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
// for tsma
int8_t isTsma;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......
......@@ -197,12 +197,14 @@ static const SSysDbTableSchema vgroupsSchema[] = {
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT},
};
static const SSysDbTableSchema smaSchema[] = {
{.name = "sma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "stable_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
};
static const SSysDbTableSchema transSchema[] = {
......
......@@ -2908,6 +2908,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2970,6 +2972,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
}
}
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......
......@@ -183,7 +183,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dDebug("vgId:%d, create vnode req is received", createReq.vgId);
dDebug("vgId:%d, create vnode req is received, tsma:%d", createReq.vgId, createReq.isTsma);
SVnodeCfg vnodeCfg = {0};
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
......
......@@ -328,6 +328,7 @@ typedef struct {
int64_t compStorage;
int64_t pointsWritten;
int8_t compact;
int8_t isTsma;
int8_t replica;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
} SVgObj;
......
......@@ -30,6 +30,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SArray *mndBuildDnodesArray(SMnode *pMnode);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
......
......@@ -1155,7 +1155,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (NULL == pDb || pVgroup->dbUid == pDb->uid) {
if ((NULL == pDb || pVgroup->dbUid == pDb->uid) && !pVgroup->isTsma) {
SVgroupInfo vgInfo = {0};
vgInfo.vgId = pVgroup->vgId;
vgInfo.hashBegin = pVgroup->hashBegin;
......
......@@ -354,6 +354,22 @@ static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj
return 0;
}
static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
......@@ -393,6 +409,34 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
// todo add sma info here
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_VNODE;
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreate, SDbObj *pDb, SStbObj *pStb) {
SSmaObj smaObj = {0};
memcpy(smaObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
......@@ -438,6 +482,14 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
}
SVgObj smaVgObj = {0};
if (mndAllocSmaVgroup(pMnode, pDb, &smaVgObj) != 0) {
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
return -1;
}
smaObj.dstVgId = smaVgObj.vgId;
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
......@@ -460,8 +512,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mndTransSetDbInfo(pTrans, pDb);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &smaVgObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &smaVgObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &smaVgObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -480,7 +535,6 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) {
if (pCreate->intervalUnit < 0) return -1;
if (pCreate->slidingUnit < 0) return -1;
if (pCreate->timezone < 0) return -1;
if (pCreate->dstVgId < 0) return -1;
if (pCreate->interval < 0) return -1;
if (pCreate->offset < 0) return -1;
if (pCreate->sliding < 0) return -1;
......@@ -602,6 +656,24 @@ static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *
return 0;
}
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPING) != 0) return -1;
return 0;
}
static int32_t mndSetDropSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
......@@ -643,23 +715,59 @@ static int32_t mndSetDropSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
return 0;
}
static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_DROP_VNODE;
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq);
SVgObj *pVgroup = NULL;
STrans *pTrans = NULL;
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbInfo(pTrans, pDb);
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
mndReleaseVgroup(pMnode, pVgroup);
return code;
}
......@@ -846,6 +954,9 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->dstVgId, false);
numOfRows++;
sdbRelease(pSdb, pSma);
}
......
......@@ -80,6 +80,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid, _OVER)
SDB_SET_INT8(pRaw, dataPos, pVgroup->isTsma, _OVER)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica, _OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
......@@ -127,6 +128,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pVgroup->hashEnd, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pVgroup->dbUid, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pVgroup->isTsma, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pVgroup->replica, _OVER)
for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
......@@ -167,6 +169,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pOld->hashBegin = pNew->hashBegin;
pOld->hashEnd = pNew->hashEnd;
pOld->replica = pNew->replica;
pOld->isTsma = pNew->isTsma;
memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
return 0;
}
......@@ -426,6 +429,25 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
return 0;
}
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
SArray *pArray = mndBuildDnodesArray(pMnode);
if (pArray == NULL) return -1;
pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
pVgroup->isTsma = 1;
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->updateTime = pVgroup->createdTime;
pVgroup->version = 1;
memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN);
pVgroup->dbUid = pDb->uid;
pVgroup->replica = 1;
if (mndGetAvailableDnode(pMnode, pVgroup, pArray) != 0) return -1;
mInfo("db:%s, sma vgId:%d is alloced", pDb->name, pVgroup->vgId);
return 0;
}
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
int32_t code = -1;
SArray *pArray = NULL;
......@@ -705,6 +727,9 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppendNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
numOfRows++;
sdbRelease(pSdb, pVgroup);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册