diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ad6077db098b18d2b10d95058831d4f8c25d046a..57654b2db13be9324b8d86eb4b965ede510ce4fb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1233,7 +1233,7 @@ int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq typedef struct { int64_t dbUid; char db[TSDB_DB_FNAME_LEN]; - int64_t reserved[8]; + int64_t compactStartTime; } SCompactVnodeReq; int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 60a673ef9cf25b61e8889f4941802a8500b726a2..a51a711e960b312868e862b26c022408869e3325 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -216,6 +216,7 @@ static const SSysDbTableSchema vgroupsSchema[] = { {.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true}, + {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; static const SSysDbTableSchema smaSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 95625e8d9360946fbcf1432f804ec7841e006dfc..39797e4947e1b3bd1bf5613d195b4e0cd44c30ab 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3989,9 +3989,7 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq * if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; - for (int32_t i = 0; i < 8; ++i) { - if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; - } + if (tEncodeI64(&encoder, pReq->compactStartTime) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -4006,9 +4004,7 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; - for (int32_t i = 0; i < 8; ++i) { - if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; - } + if (tDecodeI64(&decoder, &pReq->compactStartTime) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4e93a1d96ee0eb3b70742eee69d892b8cb1dc451..4b2cb087f1cebf986c9dbdf03a039b7dba1764f8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -327,6 +327,7 @@ typedef struct { SDbCfg cfg; SRWLatch lock; int64_t stateTs; + int64_t compactStartTime; } SDbObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 765d4fd33da9fa52266a89e8fc57d247c6494a96..51eb24f40235e21a811ef48a172c75ba09b4f396 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -43,6 +43,7 @@ int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVg int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId, bool force); int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup, SArray *pArray); +int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs); void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 8a4f6c2195ba88b1adea293690b1ba209c6c91a8..50ebb2a7dc9ea1a42913c514358b2c00f9ad9102 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -30,7 +30,7 @@ #include "systable.h" #define DB_VER_NUMBER 1 -#define DB_RESERVE_SIZE 54 +#define DB_RESERVE_SIZE 46 static SSdbRaw *mndDbActionEncode(SDbObj *pDb); static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); @@ -128,6 +128,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT16(pRaw, dataPos, pDb->cfg.hashPrefix, _OVER) SDB_SET_INT16(pRaw, dataPos, pDb->cfg.hashSuffix, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.tsdbPageSize, _OVER) + SDB_SET_INT64(pRaw, dataPos, pDb->compactStartTime, _OVER) SDB_SET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -217,6 +218,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.hashPrefix, _OVER) SDB_GET_INT16(pRaw, dataPos, &pDb->cfg.hashSuffix, _OVER) SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.tsdbPageSize, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pDb->compactStartTime, _OVER) SDB_GET_RESERVE(pRaw, dataPos, DB_RESERVE_SIZE, _OVER) taosInitRWLatch(&pDb->lock); @@ -275,6 +277,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { pOld->cfg.replications = pNew->cfg.replications; pOld->cfg.sstTrigger = pNew->cfg.sstTrigger; pOld->cfg.tsdbPageSize = pNew->cfg.tsdbPageSize; + pOld->compactStartTime = pNew->compactStartTime; taosWUnLockLatch(&pOld->lock); return 0; } @@ -1388,7 +1391,63 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, return 0; } -static int32_t mndCompactDb(SMnode *pMnode, SDbObj *pDb) { return 0; } +static int32_t mndSetCompactDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) { + SDbObj dbObj = {0}; + memcpy(&dbObj, pDb, sizeof(SDbObj)); + dbObj.compactStartTime = compactTs; + + SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + sdbFreeRaw(pCommitRaw); + return -1; + } + + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + return 0; +} + +static int32_t mndSetCompactDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (mndVgroupInDb(pVgroup, pDb->uid)) { + if (mndBuildCompactVgroupAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + } + + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { + int64_t compactTs = taosGetTimestampMs(); + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "compact-db"); + if (pTrans == NULL) goto _OVER; + + mInfo("trans:%d, used to compact db:%s", pTrans->id, pDb->name); + mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; + if (mndSetCompactDbCommitLogs(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER; + if (mndSetCompactDbRedoActions(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + code = 0; + +_OVER: + mndTransDrop(pTrans); + return code; +} static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -1412,10 +1471,11 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { goto _OVER; } - code = mndCompactDb(pMnode, pDb); + code = mndCompactDb(pMnode, pReq, pDb); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: - if (code != 0) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr()); } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 31ab1f3259cd45b0223c5962fbca5f27386da57a..bbc9b8d460b6603c8995f3f0b050ecbd4853342b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -729,6 +729,13 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pDb->compactStartTime <= 0) { + colDataAppendNULL(pColInfo, rows); + } else { + colDataAppend(pColInfo, numOfRows, (const char *)&pDb->compactStartTime, false); + } + numOfRows++; sdbRelease(pSdb, pVgroup); } @@ -2066,3 +2073,59 @@ _OVER: } bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } + +static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, + int64_t compactTs) { + SCompactVnodeReq compactReq = {0}; + compactReq.dbUid = pDb->uid; + compactReq.compactStartTime = compactTs; + tstrncpy(compactReq.db, pDb->name, TSDB_DB_FNAME_LEN); + + mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId); + int32_t contLen = tSerializeSCompactVnodeReq(NULL, 0, &compactReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + contLen += sizeof(SMsgHead); + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + SMsgHead *pHead = pReq; + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + tSerializeSCompactVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &compactReq); + *pContLen = contLen; + return pReq; +} + +static int32_t mndAddCompactVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, + int64_t compactTs) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = 0; + void *pReq = mndBuildCompactVnodeReq(pMnode, pDb, pVgroup, &contLen, compactTs); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_COMPACT; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs) { + if (mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) return -1; + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1ca68fd877fcfa0f88ffa4347978f65c00373d01..2eb44636b19843acc4e475769818cdf5f5c8dff4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -30,6 +30,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -308,8 +309,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp vnodeBegin(pVnode); goto _exit; case TDMT_VND_COMPACT: - vnodeAsyncCompact(pVnode); - vnodeBegin(pVnode); + vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp); goto _exit; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); @@ -1266,3 +1266,20 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq _err: return code; } + +static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SCompactVnodeReq req = {0}; + if (tDeserializeSCompactVnodeReq(pReq, len, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return TSDB_CODE_INVALID_MSG; + } + vInfo("vgId:%d, compact msg will be processed, db:%s dbUid:%" PRId64 " compactStartTime:%" PRId64, TD_VID(pVnode), + req.db, req.dbUid, req.compactStartTime); + +#if 0 + vnodeAsyncCompact(pVnode); + vnodeBegin(pVnode); +#endif + + return 0; +} \ No newline at end of file