提交 341f813a 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/main' into fix/TS-2770

...@@ -116,6 +116,7 @@ int32_t* taosGetErrno(); ...@@ -116,6 +116,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) //
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) //
......
...@@ -89,7 +89,7 @@ else ...@@ -89,7 +89,7 @@ else
${build_dir}/bin/tdengine-datasource.zip \ ${build_dir}/bin/tdengine-datasource.zip \
${build_dir}/bin/tdengine-datasource.zip.md5sum" ${build_dir}/bin/tdengine-datasource.zip.md5sum"
[ -f ${build_dir}/bin/taosx ] && taosx_bin="${build_dir}/bin/taosx" [ -f ${build_dir}/bin/taosx ] && taosx_bin="${build_dir}/bin/taosx"
explorer_bin_files=$(sh -c "ls ${build_dir}/bin/*-explorer") explorer_bin_files=$(find ${build_dir}/bin/ -name '*-explorer')
bin_files="${build_dir}/bin/${serverName} \ bin_files="${build_dir}/bin/${serverName} \
${build_dir}/bin/${clientName} \ ${build_dir}/bin/${clientName} \
......
...@@ -192,8 +192,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp ...@@ -192,8 +192,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
taosWriteQitem(pVnode->pFetchQ, pMsg); taosWriteQitem(pVnode->pFetchQ, pMsg);
break; break;
case WRITE_QUEUE: case WRITE_QUEUE:
if (!osDataSpaceAvailable()) { if (!osDataSpaceSufficient()) {
terrno = TSDB_CODE_NO_DISKSPACE; terrno = TSDB_CODE_NO_ENOUGH_DISKSPACE;
code = terrno; code = terrno;
dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); dError("vgId:%d, msg:%p put into vnode-write queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
break; break;
......
...@@ -2,8 +2,9 @@ aux_source_directory(src MNODE_SRC) ...@@ -2,8 +2,9 @@ aux_source_directory(src MNODE_SRC)
IF (TD_PRIVILEGE) IF (TD_PRIVILEGE)
ADD_DEFINITIONS(-D_PRIVILEGE) ADD_DEFINITIONS(-D_PRIVILEGE)
ENDIF () ENDIF ()
IF (TD_PRIVILEGE) IF (TD_ENTERPRISE)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c) LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/privilege/src/privilege.c)
LIST(APPEND MNODE_SRC ${TD_ENTERPRISE_DIR}/src/plugins/mnode/src/mndDb.c)
ENDIF () ENDIF ()
add_library(mnode STATIC ${MNODE_SRC}) add_library(mnode STATIC ${MNODE_SRC})
......
...@@ -33,6 +33,8 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb); ...@@ -33,6 +33,8 @@ bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
SSdbRaw *mndDbActionEncode(SDbObj *pDb); SSdbRaw *mndDbActionEncode(SDbObj *pDb);
const char *mndGetDbStr(const char *src); const char *mndGetDbStr(const char *src);
int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -41,12 +41,15 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); ...@@ -41,12 +41,15 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
static int32_t mndProcessDropDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
static int32_t mndProcessUseDbReq(SRpcMsg *pReq); static int32_t mndProcessUseDbReq(SRpcMsg *pReq);
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq);
static int32_t mndProcessTrimDbReq(SRpcMsg *pReq); static int32_t mndProcessTrimDbReq(SRpcMsg *pReq);
static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity); static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq); static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq);
#ifndef TD_ENTERPRISE
int32_t mndProcessCompactDbReq(SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; }
#endif
int32_t mndInitDb(SMnode *pMnode) { int32_t mndInitDb(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
.sdbType = SDB_DB, .sdbType = SDB_DB,
...@@ -1395,98 +1398,6 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, ...@@ -1395,98 +1398,6 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
return 0; return 0;
} }
static int32_t mndSetCompactDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
SDbObj dbObj = {0};
memcpy(&dbObj, pDb, sizeof(SDbObj));
dbObj.compactStartTime = compactTs;
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndSetCompactDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int64_t compactTs) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (mndVgroupInDb(pVgroup, pDb->uid)) {
if (mndBuildCompactVgroupAction(pMnode, pTrans, pDb, pVgroup, compactTs) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
}
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndCompactDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
int64_t compactTs = taosGetTimestampMs();
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "compact-db");
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to compact db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetCompactDbCommitLogs(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
if (mndSetCompactDbRedoActions(pMnode, pTrans, pDb, compactTs) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SDbObj *pDb = NULL;
SCompactDbReq compactReq = {0};
if (tDeserializeSCompactDbReq(pReq->pCont, pReq->contLen, &compactReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("db:%s, start to compact", compactReq.db);
pDb = mndAcquireDb(pMnode, compactReq.db);
if (pDb == NULL) {
goto _OVER;
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
goto _OVER;
}
code = mndCompactDb(pMnode, pReq, pDb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("db:%s, failed to process compact db req since %s", compactReq.db, terrstr());
}
mndReleaseDb(pMnode, pDb);
return code;
}
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
......
...@@ -344,8 +344,8 @@ static int32_t mndInitWal(SMnode *pMnode) { ...@@ -344,8 +344,8 @@ static int32_t mndInitWal(SMnode *pMnode) {
.fsyncPeriod = 0, .fsyncPeriod = 0,
.rollPeriod = -1, .rollPeriod = -1,
.segSize = -1, .segSize = -1,
.retentionPeriod = -1, .retentionPeriod = 0,
.retentionSize = -1, .retentionSize = 0,
.level = TAOS_WAL_FSYNC, .level = TAOS_WAL_FSYNC,
}; };
...@@ -370,7 +370,6 @@ static int32_t mndInitSdb(SMnode *pMnode) { ...@@ -370,7 +370,6 @@ static int32_t mndInitSdb(SMnode *pMnode) {
opt.path = pMnode->path; opt.path = pMnode->path;
opt.pMnode = pMnode; opt.pMnode = pMnode;
opt.pWal = pMnode->pWal; opt.pWal = pMnode->pWal;
opt.sync = pMnode->syncMgmt.sync;
pMnode->pSdb = sdbInit(&opt); pMnode->pSdb = sdbInit(&opt);
if (pMnode->pSdb == NULL) { if (pMnode->pSdb == NULL) {
...@@ -552,16 +551,7 @@ void mndPreClose(SMnode *pMnode) { ...@@ -552,16 +551,7 @@ void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
syncPreStop(pMnode->syncMgmt.sync); syncPreStop(pMnode->syncMgmt.sync);
#if 0 sdbWriteFile(pMnode->pSdb, 0);
while (syncSnapshotRecving(pMnode->syncMgmt.sync)) {
mInfo("vgId:1, snapshot is recving");
taosMsleep(300);
}
while (syncSnapshotSending(pMnode->syncMgmt.sync)) {
mInfo("vgId:1, snapshot is sending");
taosMsleep(300);
}
#endif
} }
} }
......
...@@ -118,12 +118,12 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta ...@@ -118,12 +118,12 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
transId, pTrans->createdTime, pMgmt->transId); transId, pTrans->createdTime, pMgmt->transId);
mndTransExecute(pMnode, pTrans, false); mndTransExecute(pMnode, pTrans, false);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
} else { } else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
} }
} }
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
return 0; return 0;
} }
...@@ -319,6 +319,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -319,6 +319,7 @@ int32_t mndInitSync(SMnode *pMnode) {
mError("failed to open sync since %s", terrstr()); mError("failed to open sync since %s", terrstr());
return -1; return -1;
} }
pMnode->pSdb->sync = pMgmt->sync;
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0; return 0;
......
...@@ -1645,8 +1645,6 @@ void mndTransPullup(SMnode *pMnode) { ...@@ -1645,8 +1645,6 @@ void mndTransPullup(SMnode *pMnode) {
} }
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
......
...@@ -37,7 +37,7 @@ extern "C" { ...@@ -37,7 +37,7 @@ extern "C" {
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on // clang-format on
#define SDB_WRITE_DELTA 20 #define SDB_WRITE_DELTA 2000
#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \ #define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \
{ \ { \
......
...@@ -53,7 +53,6 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -53,7 +53,6 @@ SSdb *sdbInit(SSdbOpt *pOption) {
} }
pSdb->pWal = pOption->pWal; pSdb->pWal = pOption->pWal;
pSdb->sync = pOption->sync;
pSdb->applyIndex = -1; pSdb->applyIndex = -1;
pSdb->applyTerm = -1; pSdb->applyTerm = -1;
pSdb->applyConfig = -1; pSdb->applyConfig = -1;
......
...@@ -472,10 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { ...@@ -472,10 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) { if (pSdb->pWal != NULL) {
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex, 0); if (pSdb->sync > 0) {
if (pSdb->sync == 0) {
code = 0;
} else {
code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex); code = syncBeginSnapshot(pSdb->sync, pSdb->applyIndex);
} }
} }
...@@ -484,11 +481,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { ...@@ -484,11 +481,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
} }
if (code == 0) { if (code == 0) {
if (pSdb->pWal != NULL) { if (pSdb->pWal != NULL) {
// code = walEndSnapshot(pSdb->pWal); if (pSdb->sync > 0) {
if (pSdb->sync == 0) {
code = 0;
} else {
code = syncEndSnapshot(pSdb->sync); code = syncEndSnapshot(pSdb->sync);
} }
} }
......
...@@ -14,7 +14,6 @@ target_sources( ...@@ -14,7 +14,6 @@ target_sources(
"src/vnd/vnodeSvr.c" "src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeCompact.c"
"src/vnd/vnodeRetention.c" "src/vnd/vnodeRetention.c"
# meta # meta
...@@ -53,7 +52,6 @@ target_sources( ...@@ -53,7 +52,6 @@ target_sources(
"src/tsdb/tsdbCacheRead.c" "src/tsdb/tsdbCacheRead.c"
"src/tsdb/tsdbRetention.c" "src/tsdb/tsdbRetention.c"
"src/tsdb/tsdbDiskData.c" "src/tsdb/tsdbDiskData.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbMergeTree.c" "src/tsdb/tsdbMergeTree.c"
"src/tsdb/tsdbDataIter.c" "src/tsdb/tsdbDataIter.c"
...@@ -69,10 +67,20 @@ target_sources( ...@@ -69,10 +67,20 @@ target_sources(
"src/tq/tqSnapshot.c" "src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c" "src/tq/tqOffsetSnapshot.c"
) )
IF (TD_VNODE_PLUGINS)
target_sources(
vnode
PRIVATE
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/tsdbCompact.c
${TD_ENTERPRISE_DIR}/src/plugins/vnode/src/vnodeCompact.c
)
ENDIF ()
target_include_directories( target_include_directories(
vnode vnode
PUBLIC "inc" PUBLIC "inc"
PRIVATE "src/inc" PUBLIC "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
) )
target_link_libraries( target_link_libraries(
......
...@@ -106,10 +106,6 @@ int32_t vnodeSyncCommit(SVnode* pVnode); ...@@ -106,10 +106,6 @@ int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode);
// vnodeCompact.c
int32_t vnodeAsyncCompact(SVnode* pVnode);
int32_t vnodeSyncCompact(SVnode* pVnode);
// vnodeSync.c // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); int32_t vnodeSyncStart(SVnode* pVnode);
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
extern int32_t tsdbCommitCompact(STsdb *pTsdb);
static int32_t vnodeCompactTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
SCompactInfo *pInfo = (SCompactInfo *)param;
SVnode *pVnode = pInfo->pVnode;
// do compact
code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
// end compact
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vnodeCommitInfo(dir);
tsdbCommitCompact(pVnode->pTsdb);
_exit:
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
tsem_wait(&pVnode->canCommit);
pInfo->pVnode = pVnode;
pInfo->flag = 0;
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
SVnodeInfo info = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeLoadInfo(dir, &info) < 0) {
code = terrno;
goto _exit;
}
info.state.commitID = pInfo->commitID;
if (vnodeSaveInfo(dir, &info) < 0) {
code = terrno;
goto _exit;
}
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code),
pVnode->state.commitID);
} else {
vDebug("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pVnode), __func__, pVnode->state.commitID);
}
return code;
}
int32_t vnodeAsyncCompact(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
vnodeAsyncCommit(pVnode);
code = vnodePrepareCompact(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeCompactTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
}
int32_t vnodeSyncCompact(SVnode *pVnode) {
vnodeAsyncCompact(pVnode);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
return 0;
}
\ No newline at end of file
...@@ -1641,17 +1641,13 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p ...@@ -1641,17 +1641,13 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
extern int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SCompactVnodeReq req = {0}; return vnodeProcessCompactVnodeReqImpl(pVnode, version, pReq, len, pRsp);
if (tDeserializeSCompactVnodeReq(pReq, len, &req) != 0) { }
terrno = TSDB_CODE_INVALID_MSG;
return TSDB_CODE_INVALID_MSG;
}
vInfo("vgId:%d, compact msg will be processed, db:%s dbUid:%" PRId64 " compactStartTime:%" PRId64, TD_VID(pVnode),
req.db, req.dbUid, req.compactStartTime);
vnodeAsyncCompact(pVnode);
vnodeBegin(pVnode);
return 0; #ifndef TD_ENTERPRISE
} int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
#endif
...@@ -94,6 +94,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") ...@@ -94,6 +94,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册