提交 3a40a6d3 编写于 作者: C Cary Xu

[TS-238]<feature>: support truncate function

上级 20081001
......@@ -111,6 +111,7 @@ extern int8_t tsCacheLastRow;
// tsdb
extern bool tsdbForceKeepFile;
extern bool tsdbForceTruncateFile;
extern bool tsdbForceCompactFile;
extern int32_t tsdbWalFlushSize;
......
......@@ -157,6 +157,7 @@ int32_t tsTsdbMetaCompactRatio = TSDB_META_COMPACT_RATIO;
// For backward compatibility
bool tsdbForceKeepFile = false;
bool tsdbForceCompactFile = false; // compact TSDB fileset forcibly
bool tsdbForceTruncateFile = false;
int32_t tsdbWalFlushSize = TSDB_DEFAULT_WAL_FLUSH_SIZE; // MB
// balance
......
......@@ -42,6 +42,8 @@ int32_t main(int32_t argc, char *argv[]) {
}
} else if (strcmp(argv[i], "-C") == 0) {
dump_config = 1;
} else if (strcmp(argv[i], "--force-truncate-file") == 0) {
tsdbForceTruncateFile = true;
} else if (strcmp(argv[i], "--force-compact-file") == 0) {
tsdbForceCompactFile = true;
} else if (strcmp(argv[i], "--force-keep-file") == 0) {
......
......@@ -191,11 +191,18 @@ static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeSync(pSyncVnode->vgId);
}
#ifdef __TRUNCATE_TEST__
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompactVnode = rpcMsg->pCont;
pCompactVnode->vgId = htonl(pCompactVnode->vgId);
return vnodeCompact(pCompactVnode->vgId);
}
#else
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
STruncateTblMsg *pTruncateMsg = rpcMsg->pCont;
return vnodeTruncate(pTruncateMsg);
}
#endif
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
......
......@@ -258,6 +258,11 @@ typedef struct SSchema {
int16_t bytes;
} SSchema;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct {
int32_t contLen;
int32_t vgId;
......@@ -396,6 +401,16 @@ typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
// N.B. JUST Utility for DEMO Implementation(not formal definition)
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
uint16_t nSpan;
char tableFname[TSDB_TABLE_FNAME_LEN];
STimeWindow span[];
} STruncateTblMsg;
// N.B. JUST Utility for DEMO Implementation(not formal definition)
typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
......@@ -448,11 +463,6 @@ typedef struct STableIdInfo {
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
typedef struct STimeWindow {
TSKEY skey;
TSKEY ekey;
} STimeWindow;
typedef struct {
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
......@@ -546,6 +556,7 @@ typedef struct {
uint8_t role;
uint8_t replica;
uint8_t compact;
uint8_t truncate;
} SVnodeLoad;
typedef struct {
......
......@@ -96,6 +96,7 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo);
int8_t tsdbGetCompactState(STsdbRepo *repo);
int8_t tsdbGetTruncateState(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {
uint64_t uid; // the unique table ID
......@@ -413,6 +414,9 @@ int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Truncate
int tsdbTruncate(STsdbRepo *pRepo, void *param);
// For TSDB Health Monitor
// no problem return true
......
......@@ -63,6 +63,7 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeCompact(int32_t vgId);
int32_t vnodeTruncate(STruncateTblMsg *pMsg);
// vnodeMgmt
int32_t vnodeInitMgmt();
......
......@@ -146,7 +146,8 @@ typedef struct SVgObj {
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int32_t vgCfgVersion;
int8_t compact;
int8_t reserved1[8];
int8_t truncate;
int8_t reserved1[6];
int8_t updateEnd[4];
int32_t refCount;
int32_t numOfTables;
......
......@@ -352,7 +352,8 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup,NULL);
}
pVgroup->compact = pVload->compact;
pVgroup->compact = (int8_t)pVload->compact;
pVgroup->truncate = (int8_t)pVload->truncate;
}
static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
......@@ -834,7 +835,11 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->compact;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->truncate;
cols++;
mnodeDecVgroupRef(pVgroup);
numOfRows++;
}
......
......@@ -16,8 +16,8 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_
typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T;
typedef enum { COMMIT_REQ, COMPACT_REQ, TRUNCATE_REQ, COMMIT_CONFIG_REQ } TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
int tsdbScheduleCommit(STsdbRepo *pRepo, void* param, TSDB_REQ_T req);
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
......@@ -39,10 +39,10 @@ typedef struct STable {
SDataCol *lastCols;
int16_t maxColNum;
int16_t restoreColumnNum;
int16_t cacheLastConfigVersion;
bool hasRestoreLastColumn;
int lastColSVersion;
int16_t cacheLastConfigVersion;
T_REF_DECLARE()
T_REF_DECLARE() // int32_t
} STable;
typedef struct {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_TRUNCATE_H_
#define _TD_TSDB_TRUNCATE_H_
#ifdef __cplusplus
extern "C" {
#endif
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param);
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_TRUNCATE_H_ */
\ No newline at end of file
......@@ -66,6 +66,8 @@ extern "C" {
#include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
// Truncate
#include "tsdbTruncate.h"
// Commit Queue
#include "tsdbCommitQueue.h"
......@@ -97,6 +99,8 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
int8_t truncateState; // truncate state: inTruncate/noTruncate/waitingTruncate
pthread_t* pthread;
};
......
......@@ -28,6 +28,7 @@ typedef struct {
typedef struct {
TSDB_REQ_T req;
STsdbRepo *pRepo;
void * param;
} SReq;
static void *tsdbLoopCommit(void *arg);
......@@ -91,7 +92,7 @@ void tsdbDestroyCommitQueue() {
pthread_mutex_destroy(&(pQueue->lock));
}
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
int tsdbScheduleCommit(STsdbRepo *pRepo, void *param, TSDB_REQ_T req) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
......@@ -102,6 +103,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
((SReq *)pNode->data)->req = req;
((SReq *)pNode->data)->pRepo = pRepo;
((SReq *)pNode->data)->param = param;
pthread_mutex_lock(&(pQueue->lock));
......@@ -158,6 +160,7 @@ static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL;
void * param = NULL;
TSDB_REQ_T req;
setThreadName("tsdbCommit");
......@@ -183,19 +186,22 @@ static void *tsdbLoopCommit(void *arg) {
req = ((SReq *)pNode->data)->req;
pRepo = ((SReq *)pNode->data)->pRepo;
param = ((SReq *)pNode->data)->param;
if (req == COMMIT_REQ) {
tsdbCommitData(pRepo);
} else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo);
} else if (req == COMMIT_CONFIG_REQ) {
} else if (req == TRUNCATE_REQ) {
tsdbTruncateImpl(pRepo, param);
} else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed);
tsdbApplyRepoConfig(pRepo);
tsem_post(&(pRepo->readyToCommit));
} else {
ASSERT(0);
}
tfree(param);
listNodeFree(pNode);
}
......
......@@ -97,11 +97,15 @@ _err:
static int tsdbAsyncCompact(STsdbRepo *pRepo) {
if (pRepo->compactState != TSDB_NO_COMPACT) {
tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo));
return 0;
}
pRepo->compactState = TSDB_WAITING_COMPACT;
return 0;
}
pRepo->compactState = TSDB_WAITING_COMPACT;
tsem_wait(&(pRepo->readyToCommit));
return tsdbScheduleCommit(pRepo, COMPACT_REQ);
int code = tsdbScheduleCommit(pRepo, NULL, COMPACT_REQ);
if (code != 0) {
tsem_post(&(pRepo->readyToCommit));
}
return code;
}
static void tsdbStartCompact(STsdbRepo *pRepo) {
......
......@@ -215,6 +215,8 @@ int tsdbGetState(STsdbRepo *repo) { return repo->state; }
int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); }
int8_t tsdbGetTruncateState(STsdbRepo *repo) { return (int8_t)(repo->truncateState); }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
STsdbRepo *pRepo = repo;
......
......@@ -286,7 +286,9 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
}
if (tsdbLockRepo(pRepo) < 0) return -1;
tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ);
if (tsdbScheduleCommit(pRepo, NULL, COMMIT_CONFIG_REQ) < 0) {
tsem_post(&(pRepo->readyToCommit));
}
if (tsdbUnlockRepo(pRepo) < 0) return -1;
tsem_wait(&(pRepo->readyToCommit));
......@@ -318,7 +320,9 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
tsdbScheduleCommit(pRepo, COMMIT_REQ);
if (tsdbScheduleCommit(pRepo, NULL, COMMIT_REQ) < 0) {
tsem_post(&(pRepo->readyToCommit));
}
if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
typedef struct {
STable * pTable;
SBlockIdx * pBlkIdx;
SBlockIdx bIndex;
SBlockInfo *pInfo;
} STableTruncateH;
typedef struct {
SRtn rtn;
SFSIter fsIter;
SArray * tbArray; // STableTruncateH, table array to cache table obj and block indexes
SReadH readh;
SDFileSet wSet;
SArray * aBlkIdx;
SArray * aSupBlk;
SDataCols *pDataCols;
} STruncateH;
#define TSDB_TRUNCATE_WSET(pTruncateH) (&((pTruncateH)->wSet))
#define TSDB_TRUNCATE_REPO(pTruncateH) TSDB_READ_REPO(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_HEAD_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_HEAD)
#define TSDB_TRUNCATE_DATA_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_DATA)
#define TSDB_TRUNCATE_LAST_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_LAST)
#define TSDB_TRUNCATE_SMAD_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_SMAD)
#define TSDB_TRUNCATE_SMAL_FILE(pTruncateH) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(pTruncateH), TSDB_FILE_SMAL)
#define TSDB_TRUNCATE_BUF(pTruncateH) TSDB_READ_BUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh))
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param);
static void tsdbStartTruncate(STsdbRepo *pRepo);
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static bool tsdbShouldTruncate(STruncateH *pTruncateH);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitCompTbArray(STruncateH *pTruncateH);
static void tsdbDestroyCompTbArray(STruncateH *pTruncateH);
static int tsdbCacheFSetIndex(STruncateH *pTruncateH);
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH);
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf, void **ppExBuf);
enum {
TSDB_NO_TRUNCATE,
TSDB_IN_TRUNCATE,
TSDB_WAITING_TRUNCATE,
};
int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param); }
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
int32_t code = 0;
// Step 1: check and clear cache
if ((code = tsdbTruncateCache(pRepo, param)) != 0) {
pRepo->code = terrno;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d failed to truncate since %s", REPO_ID(pRepo), tstrerror(terrno));
return NULL;
}
// Step 2: truncate and rebuild DFileSets
// Check if there are files in TSDB FS to truncate
if ((REPO_FS(pRepo)->cstatus->pmf == NULL) || (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0)) {
pRepo->truncateState = TSDB_NO_TRUNCATE;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d truncate over, no meta or data file", REPO_ID(pRepo));
return NULL;
}
tsdbStartTruncate(pRepo);
if (tsdbTruncateMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to truncate META data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbTruncateTSData(pRepo, param) < 0) {
tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
tsdbEndTruncate(pRepo, TSDB_CODE_SUCCESS);
return NULL;
_err:
pRepo->code = terrno;
tsdbEndTruncate(pRepo, terrno);
return NULL;
}
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) {
// step 1: reset query cache(reset all or the specific cache)
// TODO ... check with Doctor Liao
// if(... <0){
// terrno = ...;
// return -1;
// }
// step 2: check and clear cache of last_row/last
// TODO: ... scan/check/clear stable/child table/common table
// if(... <0){
// terrno = ...;
// return -1;
// }
return 0;
}
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) {
// avoid repeated input of commands by end users in a short period of time
if (pRepo->truncateState != TSDB_NO_TRUNCATE) {
tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo));
return 0;
}
pRepo->truncateState = TSDB_WAITING_TRUNCATE;
// flush the mem data to disk synchronously(have impact on the compression rate)
tsdbSyncCommit(pRepo);
// truncate
tsem_wait(&(pRepo->readyToCommit));
int code = tsdbScheduleCommit(pRepo, param, TRUNCATE_REQ);
if (code < 0) {
tsem_post(&(pRepo->readyToCommit));
}
return code;
}
static void tsdbStartTruncate(STsdbRepo *pRepo) {
assert(pRepo->truncateState != TSDB_IN_TRUNCATE);
tsdbInfo("vgId:%d start to truncate!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0);
pRepo->code = TSDB_CODE_SUCCESS;
pRepo->truncateState = TSDB_IN_TRUNCATE;
}
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else {
tsdbEndFSTxn(pRepo);
}
pRepo->truncateState = TSDB_NO_TRUNCATE;
tsdbInfo("vgId:%d truncate over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
tsem_post(&(pRepo->readyToCommit));
}
static int tsdbTruncateMeta(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
return 0;
}
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
STruncateH compactH;
SDFileSet *pSet = NULL;
tsdbDebug("vgId:%d start to truncate TS data", REPO_ID(pRepo));
if (tsdbInitTruncateH(&compactH, pRepo) < 0) {
return -1;
}
while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
// Remove those expired files
if (pSet->fid < compactH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue;
}
#if 0
if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
tsdbDebug("vgId:%d FSET %d on level %d, should not truncate", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
continue;
}
#endif
if (tsdbTruncateFSet(&compactH, pSet) < 0) {
tsdbDestroyTruncateH(&compactH);
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
}
tsdbDestroyTruncateH(&compactH);
tsdbDebug("vgId:%d truncate TS data over", REPO_ID(pRepo));
return 0;
}
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
SDiskID did;
tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
TSDB_FSET_ID(pSet));
if (tsdbTruncateFSetInit(pTruncateH, pSet) < 0) {
return -1;
}
if (!tsdbShouldTruncate(pTruncateH)) {
tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
if (tsdbApplyRtnOnFSet(TSDB_TRUNCATE_REPO(pTruncateH), pSet, &(pTruncateH->rtn)) < 0) {
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER);
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
if (tsdbTruncateFSetImpl(pTruncateH) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbTruncateFSetEnd(pTruncateH);
return -1;
}
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(pTruncateH));
tsdbDebug("vgId:%d FSET %d compact over", REPO_ID(pRepo), pSet->fid);
}
tsdbTruncateFSetEnd(pTruncateH);
return 0;
}
static bool tsdbShouldTruncate(STruncateH *pTruncateH) {
if (tsdbForceTruncateFile) {
return true;
}
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
STableTruncateH *pTh;
SBlock * pBlock;
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
SDFile * pDataF = TSDB_READ_DATA_FILE(pReadh);
SDFile * pLastF = TSDB_READ_LAST_FILE(pReadh);
int tblocks = 0; // total blocks
int nSubBlocks = 0; // # of blocks with sub-blocks
int nSmallBlocks = 0; // # of blocks with rows < defaultRows
int64_t tsize = 0;
for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tbArray); i++) {
pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, i);
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) {
tblocks++;
pBlock = pTh->pInfo->blocks + bidx;
if (pBlock->numOfRows < defaultRows) {
nSmallBlocks++;
}
if (pBlock->numOfSubBlocks > 1) {
nSubBlocks++;
for (int k = 0; k < pBlock->numOfSubBlocks; k++) {
SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k;
tsize = tsize + iBlock->len;
}
} else if (pBlock->numOfSubBlocks == 1) {
tsize += pBlock->len;
} else {
ASSERT(0);
}
}
}
return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) ||
(tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
}
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pTruncateH, 0, sizeof(*pTruncateH));
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH));
tsdbGetRtnSnap(pRepo, &(pTruncateH->rtn));
tsdbFSIterInit(&(pTruncateH->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(pTruncateH->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitCompTbArray(pTruncateH) < 0) {
tsdbDestroyTruncateH(pTruncateH);
return -1;
}
pTruncateH->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pTruncateH->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
return -1;
}
pTruncateH->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pTruncateH->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
return -1;
}
pTruncateH->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pTruncateH->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
return -1;
}
return 0;
}
static void tsdbDestroyTruncateH(STruncateH *pTruncateH) {
pTruncateH->pDataCols = tdFreeDataCols(pTruncateH->pDataCols);
pTruncateH->aSupBlk = taosArrayDestroy(pTruncateH->aSupBlk);
pTruncateH->aBlkIdx = taosArrayDestroy(pTruncateH->aBlkIdx);
tsdbDestroyCompTbArray(pTruncateH);
tsdbDestroyReadH(&(pTruncateH->readh));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH));
}
static int tsdbInitCompTbArray(STruncateH *pTruncateH) { // Init pComp->tbArray
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
pTruncateH->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH));
if (pTruncateH->tbArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
// Note here must start from 0
for (int i = 0; i < pMeta->maxTables; i++) {
STableTruncateH ch = {0};
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
ch.pTable = pMeta->tables[i];
}
if (taosArrayPush(pTruncateH->tbArray, &ch) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
return 0;
}
static void tsdbDestroyCompTbArray(STruncateH *pTruncateH) {
STableTruncateH *pTh;
if (pTruncateH->tbArray == NULL) return;
for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tbArray); i++) {
pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, i);
if (pTh->pTable) {
tsdbUnRefTable(pTh->pTable);
}
// pTh->pInfo = taosTZfree(pTh->pInfo);
tfree(pTh->pInfo);
}
pTruncateH->tbArray = taosArrayDestroy(pTruncateH->tbArray);
}
static int tsdbCacheFSetIndex(STruncateH *pTruncateH) {
SReadH *pReadH = &(pTruncateH->readh);
if (tsdbLoadBlockIdx(pReadH) < 0) {
return -1;
}
for (int tid = 1; tid < taosArrayGetSize(pTruncateH->tbArray); tid++) {
STableTruncateH *pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, tid);
pTh->pBlkIdx = NULL;
if (pTh->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) {
return -1;
}
if (pReadH->pBlkIdx == NULL) continue;
pTh->bIndex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bIndex);
uint32_t originLen = 0;
if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) {
return -1;
}
}
return 0;
}
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet) {
taosArrayClear(pTruncateH->aBlkIdx);
taosArrayClear(pTruncateH->aSupBlk);
if (tsdbSetAndOpenReadFSet(&(pTruncateH->readh), pSet) < 0) {
return -1;
}
if (tsdbCacheFSetIndex(pTruncateH) < 0) {
tsdbCloseAndUnsetFSet(&(pTruncateH->readh));
return -1;
}
return 0;
}
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH) { tsdbCloseAndUnsetFSet(&(pTruncateH->readh)); }
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
SBlockIdx blkIdx;
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pTruncateH->aBlkIdx);
for (int tid = 1; tid < taosArrayGetSize(pTruncateH->tbArray); tid++) {
STableTruncateH *pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, tid);
STSchema * pSchema;
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1, -1);
taosArrayClear(pTruncateH->aSupBlk);
if ((tdInitDataCols(pTruncateH->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdFreeSchema(pSchema);
return -1;
}
tdFreeSchema(pSchema);
// Loop to compact each block data
for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pTh->pInfo->blocks + i;
// Load the block data
if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) {
return -1;
}
// Merge pTruncateH->pDataCols and pReadh->pDCols[0] and write data to file
if (pTruncateH->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
if (tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
} else {
int ridx = 0;
while (true) {
if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pTruncateH->pDataCols->numOfRows);
tdMergeDataCols(pTruncateH->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx,
pCfg->update != TD_ROW_PARTIAL_UPDATE);
if (pTruncateH->pDataCols->numOfRows < defaultRows) {
break;
}
if (tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
tdResetDataCols(pTruncateH->pDataCols);
}
}
}
if (pTruncateH->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTh->pTable, pTruncateH->aSupBlk, NULL, ppBuf,
&blkIdx) < 0) {
return -1;
}
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (void *)(&blkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTruncateH->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf, void **ppExBuf) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
bool isLast;
SBlock block;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH);
isLast = true;
} else {
pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH);
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDataCols,
&block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
\ No newline at end of file
......@@ -128,6 +128,22 @@ int32_t vnodeCompact(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeTruncate(STruncateTblMsg *pMsg) {
int32_t vgId = pMsg->vgId;
void * pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname);
// not care success or not
void *param = NULL;
tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param);
vnodeRelease(pVnode);
} else {
vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg = pVnode->tsdbCfg;
SSyncCfg syncCfg = pVnode->syncCfg;
......
......@@ -163,6 +163,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
pLoad->compact = (pVnode->tsdb != NULL) ? tsdbGetCompactState(pVnode->tsdb) : 0;
pLoad->truncate = (pVnode->tsdb != NULL) ? tsdbGetCompactState(pVnode->tsdb) : 0;
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册