提交 43389aa5 编写于 作者: A Alex Duan

add delete

上级 a99b0376
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCDELETE_H
#define TDENGINE_TSCDELETE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "qTableMeta.h"
int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCDELETE_H
......@@ -54,7 +54,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(void **param);
SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj, int32_t cmd, __async_cb_func_t fp);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcmdtype.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscDelete.h"
int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if(!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
// not super table
pSql->cmd.active = pQueryInfo;
return tscBuildAndSendRequest(pSql, pQueryInfo);
}
//
// super table
//
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// pRes->code check only serves in launching super table sub-queries
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
return pRes->code;
}
tExtMemBuffer **pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
pRes->qId = 0x1; // hack the qhandle check
uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t ret = doInitSubState(pSql, numOfSub);
if (ret != 0) {
tscAsyncResultOnError(pSql);
return ret;
}
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, &nBufferSize, pSql->self);
if (ret != 0) {
pRes->code = ret;
tscAsyncResultOnError(pSql);
tfree(pDesc);
tfree(pMemoryBuf);
return ret;
}
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pState->numOfSub; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
break;
}
trs->pExtMemBuffer = pMemoryBuf;
trs->pOrderDescriptor = pDesc;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
trs->localBufferSize = nBufferSize + sizeof(tFilePage);
if (trs->localBuffer == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs);
break;
}
trs->localBuffer->num = 0;
trs->subqueryIndex = i;
trs->pParentSql = pSql;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs->localBuffer);
tfree(trs);
break;
}
// todo handle multi-vnode situation
if (pQueryInfo->tsBuf) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
assert(pNewQueryInfo->tsBuf != NULL);
}
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
trs->subqueryIndex);
}
if (i < pState->numOfSub) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
doConcurrentlySendSubQueries(pSql);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -3380,7 +3380,7 @@ int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if(UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return buildSTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
} else {
return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
}
......@@ -3390,6 +3390,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_DELETE_DATA] = tscBuildDelDataMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
......@@ -3425,7 +3426,6 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_DELETE_DATA] = tscBuildDelDataMsg;
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
......
......@@ -2716,7 +2716,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->subqueryIndex = i;
trs->pParentSql = pSql;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL, tscRetrieveDataRes, CMD_SQL_SELECT);
if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs->localBuffer);
......@@ -3165,12 +3165,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
}
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj, int32_t cmd, __async_cb_func_t fp) {
const int32_t table_index = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pPQueryInfo = tscGetQueryInfo(pCmd); // Parent SQueryInfo
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
SSqlObj *pNew = createSubqueryObj(pSql, table_index, fp, trsupport, cmd, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pNew->cmd);
......
......@@ -30,6 +30,7 @@
#include "ttimer.h"
#include "ttokendef.h"
#include "cJSON.h"
#include "tscDelete.h"
#ifdef HTTP_EMBEDDED
#include "httpInt.h"
......@@ -4168,6 +4169,12 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
if (pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
(*pSql->fp)(pSql->param, pSql, 0);
return;
} else if (pSql->cmd.command == TSDB_SQL_DELETE_DATA) {
code = executeDelete(pSql, pQueryInfo);
if (code != TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
}
return ;
}
if (pSql->cmd.command == TSDB_SQL_SELECT) {
......
......@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_TRUNCATE_H_
#define _TD_TSDB_TRUNCATE_H_
#ifndef _TD_TSDB_DELETE_H_
#define _TD_TSDB_DELETE_H_
#ifdef __cplusplus
extern "C" {
......@@ -38,4 +38,4 @@ int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
}
#endif
#endif /* _TD_TSDB_TRUNCATE_H_ */
\ No newline at end of file
#endif /* _TD_TSDB_DELETE_H_ */
\ No newline at end of file
......@@ -16,7 +16,7 @@
#ifndef _TD_TSDB_MEMTABLE_H_
#define _TD_TSDB_MEMTABLE_H_
#include "tsdbTruncate.h"
#include "tsdbDelete.h"
typedef struct {
int rowsInserted;
int rowsUpdated;
......
......@@ -66,8 +66,8 @@ extern "C" {
#include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
// Truncate
#include "tsdbTruncate.h"
// Delete
#include "tsdbDelete.h"
// Commit Queue
#include "tsdbCommitQueue.h"
......
......@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
#include "tsdbTruncate.h"
#include "tsdbDelete.h"
enum {
TSDB_NO_TRUNCATE,
TSDB_IN_TRUNCATE,
TSDB_WAITING_TRUNCATE,
TSDB_NO_DELETE,
TSDB_IN_DELETE,
TSDB_WAITING_DELETE,
};
enum BlockSolve {
......@@ -32,12 +32,12 @@ typedef struct {
SBlockIdx * pBlkIdx;
SBlockIdx bIndex;
SBlockInfo *pInfo;
} STableTruncateH;
} STableDeleteH;
typedef struct {
SRtn rtn;
SFSIter fsIter;
SArray * tblArray; // STableTruncateH, table array to cache table obj and block indexes
SArray * tblArray; // STableDeleteH, table array to cache table obj and block indexes
SReadH readh;
SDFileSet wSet;
SArray * aBlkIdx;
......@@ -45,46 +45,44 @@ typedef struct {
SArray * aSubBlk;
SDataCols *pDCols;
SControlDataInfo* pCtlInfo;
} STruncateH;
#define TSDB_TRUNCATE_WSET(ptru) (&((ptru)->wSet))
#define TSDB_TRUNCATE_REPO(ptru) TSDB_READ_REPO(&((ptru)->readh))
#define TSDB_TRUNCATE_HEAD_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_HEAD)
#define TSDB_TRUNCATE_DATA_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_DATA)
#define TSDB_TRUNCATE_LAST_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_LAST)
#define TSDB_TRUNCATE_SMAD_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_SMAD)
#define TSDB_TRUNCATE_SMAL_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_SMAL)
#define TSDB_TRUNCATE_BUF(ptru) TSDB_READ_BUF(&((ptru)->readh))
#define TSDB_TRUNCATE_COMP_BUF(ptru) TSDB_READ_COMP_BUF(&((ptru)->readh))
#define TSDB_TRUNCATE_EXBUF(ptru) TSDB_READ_EXBUF(&((ptru)->readh))
static void tsdbStartTruncate(STsdbRepo *pRepo);
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
static int tsdbFSetTruncate(STruncateH *ptru, SDFileSet *pSet);
static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *ptru, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *ptru);
static int tsdbInitTruncateTblArray(STruncateH *ptru);
static void tsdbDestroyTruncateTblArray(STruncateH *ptru);
static int tsdbCacheFSetIndex(STruncateH *ptru);
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbFSetInit(STruncateH *ptru, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *ptru);
static int tsdbTruncateFSetImpl(STruncateH *ptru);
static int tsdbFSetDeleteImpl(STruncateH *ptru);
static int tsdbBlockSolve(STruncateH *ptru, SBlock *pBlock);
static int tsdbWriteBlockToFile(STruncateH *ptru, STable *pTable, SDataCols *pDCols, void **ppBuf,
} SDeleteH;
#define TSDB_DELETE_WSET(pdh) (&((pdh)->wSet))
#define TSDB_DELETE_REPO(pdh) TSDB_READ_REPO(&((pdh)->readh))
#define TSDB_DELETE_HEAD_FILE(pdh) TSDB_DFILE_IN_SET(TSDB_DELETE_WSET(pdh), TSDB_FILE_HEAD)
#define TSDB_DELETE_DATA_FILE(pdh) TSDB_DFILE_IN_SET(TSDB_DELETE_WSET(pdh), TSDB_FILE_DATA)
#define TSDB_DELETE_LAST_FILE(pdh) TSDB_DFILE_IN_SET(TSDB_DELETE_WSET(pdh), TSDB_FILE_LAST)
#define TSDB_DELETE_SMAD_FILE(pdh) TSDB_DFILE_IN_SET(TSDB_DELETE_WSET(pdh), TSDB_FILE_SMAD)
#define TSDB_DELETE_SMAL_FILE(pdh) TSDB_DFILE_IN_SET(TSDB_DELETE_WSET(pdh), TSDB_FILE_SMAL)
#define TSDB_DELETE_BUF(pdh) TSDB_READ_BUF(&((pdh)->readh))
#define TSDB_DELETE_COMP_BUF(pdh) TSDB_READ_COMP_BUF(&((pdh)->readh))
#define TSDB_DELETE_EXBUF(pdh) TSDB_READ_EXBUF(&((pdh)->readh))
static void tsdbStartDelete(STsdbRepo *pRepo);
static void tsdbEndDelete(STsdbRepo *pRepo, int eno);
static int tsdbDeleteMeta(STsdbRepo *pRepo);
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet);
static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo);
static void tsdbDestroyDeleteH(SDeleteH *pdh);
static int tsdbInitDeleteTblArray(SDeleteH *pdh);
static void tsdbDestroyDeleteTblArray(SDeleteH *pdh);
static int tsdbCacheFSetIndex(SDeleteH *pdh);
static int tsdbDeleteCache(STsdbRepo *pRepo, void *param);
static int tsdbFSetInit(SDeleteH *pdh, SDFileSet *pSet);
static void tsdbDeleteFSetEnd(SDeleteH *pdh);
static int tsdbFSetDeleteImpl(SDeleteH *pdh);
static int tsdbBlockSolve(SDeleteH *pdh, SBlock *pBlock);
static int tsdbWriteBlockToFile(SDeleteH *pdh, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf, SBlock * pBlock);
static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
// delete
int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) {
int32_t ret = tsdbTruncateImplCommon(pRepo, pCtlInfo);
int32_t ret = tsdbDeleteImplCommon(pRepo, pCtlInfo);
if(pCtlInfo->pRsp) {
pCtlInfo->pRsp->affectedRows = htonl(pCtlInfo->pRsp->affectedRows);
pCtlInfo->pRsp->code = ret;
......@@ -93,10 +91,10 @@ int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) {
return ret;
}
static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
int32_t code = 0;
// Step 1: check and clear cache
if ((code = tsdbTruncateCache(pRepo, pCtlInfo)) != 0) {
if ((code = tsdbDeleteCache(pRepo, pCtlInfo)) != 0) {
pRepo->code = terrno;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d failed to truncate since %s", REPO_ID(pRepo), tstrerror(terrno));
......@@ -106,34 +104,34 @@ static int tsdbTruncateImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo)
// Step 2: truncate and rebuild DFileSets
// Check if there are files in TSDB FS to truncate
if ((REPO_FS(pRepo)->cstatus->pmf == NULL) || (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0)) {
pRepo->truncateState = TSDB_NO_TRUNCATE;
pRepo->truncateState = TSDB_NO_DELETE;
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d truncate over, no meta or data file", REPO_ID(pRepo));
return -1;
}
tsdbStartTruncate(pRepo);
tsdbStartDelete(pRepo);
if (tsdbTruncateMeta(pRepo) < 0) {
if (tsdbDeleteMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to truncate META data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbTruncateTSData(pRepo, pCtlInfo) < 0) {
if (tsdbDeleteTSData(pRepo, pCtlInfo) < 0) {
tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
tsdbEndTruncate(pRepo, TSDB_CODE_SUCCESS);
tsdbEndDelete(pRepo, TSDB_CODE_SUCCESS);
return TSDB_CODE_SUCCESS;
_err:
pRepo->code = terrno;
tsdbEndTruncate(pRepo, terrno);
tsdbEndDelete(pRepo, terrno);
return -1;
}
static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) {
static int tsdbDeleteCache(STsdbRepo *pRepo, void *param) {
// step 1: reset query cache(reset all or the specific cache)
// TODO ... check with Doctor Liao
// if(... <0){
......@@ -151,39 +149,39 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param) {
return 0;
}
static void tsdbStartTruncate(STsdbRepo *pRepo) {
assert(pRepo->truncateState != TSDB_IN_TRUNCATE);
static void tsdbStartDelete(STsdbRepo *pRepo) {
assert(pRepo->truncateState != TSDB_IN_DELETE);
tsdbInfo("vgId:%d start to truncate!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0);
pRepo->code = TSDB_CODE_SUCCESS;
pRepo->truncateState = TSDB_IN_TRUNCATE;
pRepo->truncateState = TSDB_IN_DELETE;
}
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno) {
static void tsdbEndDelete(STsdbRepo *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else {
tsdbEndFSTxn(pRepo);
}
pRepo->truncateState = TSDB_NO_TRUNCATE;
pRepo->truncateState = TSDB_NO_DELETE;
tsdbInfo("vgId:%d truncate over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
tsem_post(&(pRepo->readyToCommit));
}
static int tsdbTruncateMeta(STsdbRepo *pRepo) {
static int tsdbDeleteMeta(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
return 0;
}
static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
STruncateH truncateH = {0};
SDeleteH truncateH = {0};
SDFileSet * pSet = NULL;
tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->ctlData.tids[0]);
if (tsdbInitTruncateH(&truncateH, pRepo) < 0) {
if (tsdbInitDeleteH(&truncateH, pRepo) < 0) {
return -1;
}
......@@ -218,15 +216,10 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
}
#endif
if (pCtlInfo->ctlData.command == CMD_TRUNCATE) {
if (tsdbFSetTruncate(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
} else if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) {
if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) {
if (tsdbFSetDelete(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbDestroyDeleteH(&truncateH);
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
......@@ -236,29 +229,29 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
}
tsdbDestroyTruncateH(&truncateH);
tsdbDestroyDeleteH(&truncateH);
tsdbDebug("vgId:%d truncate TS data over", REPO_ID(pRepo));
return 0;
}
static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru);
static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh);
SDiskID did = {0};
SDFileSet *pWSet = TSDB_TRUNCATE_WSET(ptru);
SDFileSet *pWSet = TSDB_DELETE_WSET(pdh);
tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbFSetInit(ptru, pSet) < 0) {
if (tsdbFSetInit(pdh, pSet) < 0) {
return -1;
}
// Create new fset as truncated fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ptru->rtn)), &(did.level), &(did.id));
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pdh->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(ptru);
tsdbDeleteFSetEnd(pdh);
return -1;
}
......@@ -282,146 +275,90 @@ static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) {
return -1;
}
if (tsdbFSetDeleteImpl(ptru) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(ptru));
tsdbTruncateFSetEnd(ptru);
if (tsdbFSetDeleteImpl(pdh) < 0) {
tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh));
tsdbRemoveDFileSet(TSDB_DELETE_WSET(pdh));
tsdbDeleteFSetEnd(pdh);
return -1;
}
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(ptru));
tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_DELETE_WSET(pdh));
tsdbDebug("vgId:%d FSET %d truncate data over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(ptru);
tsdbDeleteFSetEnd(pdh);
return 0;
}
static int tsdbFSetTruncate(STruncateH *ptru, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru);
SDiskID did = {0};
SDFileSet *pWSet = TSDB_TRUNCATE_WSET(ptru);
tsdbDebug("vgId:%d start to truncate table in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbFSetInit(ptru, pSet) < 0) {
return -1;
}
// Create new fset as truncated fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ptru->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(ptru);
return -1;
}
// Only .head is created, use original .data/.last/.smad/.smal
tsdbInitDFileSetEx(pWSet, pSet);
pWSet->state = 0;
SDFile *pHeadFile = TSDB_DFILE_IN_SET(pWSet, TSDB_FILE_HEAD);
tsdbInitDFile(pHeadFile, did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pHeadFile, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCloseDFile(pHeadFile);
tsdbRemoveDFile(pHeadFile);
return -1;
}
tsdbCloseDFile(pHeadFile);
if (tsdbOpenDFileSet(pWSet, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file set %d since %s", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), tstrerror(terrno));
return -1;
}
if (tsdbTruncateFSetImpl(ptru) < 0) {
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru));
tsdbRemoveDFileSet(TSDB_TRUNCATE_WSET(ptru));
tsdbTruncateFSetEnd(ptru);
return -1;
}
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_TRUNCATE_WSET(ptru));
tsdbDebug("vgId:%d FSET %d truncate table over", REPO_ID(pRepo), pSet->fid);
tsdbTruncateFSetEnd(ptru);
return 0;
}
static int tsdbInitTruncateH(STruncateH *ptru, STsdbRepo *pRepo) {
static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(ptru, 0, sizeof(*ptru));
memset(pdh, 0, sizeof(*pdh));
TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(ptru));
TSDB_FSET_SET_CLOSED(TSDB_DELETE_WSET(pdh));
tsdbGetRtnSnap(pRepo, &(ptru->rtn));
tsdbFSIterInit(&(ptru->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
tsdbGetRtnSnap(pRepo, &(pdh->rtn));
tsdbFSIterInit(&(pdh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(ptru->readh), pRepo) < 0) {
if (tsdbInitReadH(&(pdh->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitTruncateTblArray(ptru) < 0) {
tsdbDestroyTruncateH(ptru);
if (tsdbInitDeleteTblArray(pdh) < 0) {
tsdbDestroyDeleteH(pdh);
return -1;
}
ptru->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (ptru->aBlkIdx == NULL) {
pdh->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pdh->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(ptru);
tsdbDestroyDeleteH(pdh);
return -1;
}
ptru->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (ptru->aSupBlk == NULL) {
pdh->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pdh->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(ptru);
tsdbDestroyDeleteH(pdh);
return -1;
}
ptru->aSubBlk = taosArrayInit(20, sizeof(SBlock));
if (ptru->aSubBlk == NULL) {
pdh->aSubBlk = taosArrayInit(20, sizeof(SBlock));
if (pdh->aSubBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(ptru);
tsdbDestroyDeleteH(pdh);
return -1;
}
ptru->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (ptru->pDCols == NULL) {
pdh->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pdh->pDCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(ptru);
tsdbDestroyDeleteH(pdh);
return -1;
}
return 0;
}
static void tsdbDestroyTruncateH(STruncateH *ptru) {
ptru->pDCols = tdFreeDataCols(ptru->pDCols);
ptru->aSupBlk = taosArrayDestroy(&ptru->aSupBlk);
ptru->aSubBlk = taosArrayDestroy(&ptru->aSubBlk);
ptru->aBlkIdx = taosArrayDestroy(&ptru->aBlkIdx);
tsdbDestroyTruncateTblArray(ptru);
tsdbDestroyReadH(&(ptru->readh));
tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(ptru));
static void tsdbDestroyDeleteH(SDeleteH *pdh) {
pdh->pDCols = tdFreeDataCols(pdh->pDCols);
pdh->aSupBlk = taosArrayDestroy(&pdh->aSupBlk);
pdh->aSubBlk = taosArrayDestroy(&pdh->aSubBlk);
pdh->aBlkIdx = taosArrayDestroy(&pdh->aBlkIdx);
tsdbDestroyDeleteTblArray(pdh);
tsdbDestroyReadH(&(pdh->readh));
tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh));
}
// init tbl array with pRepo->meta
static int tsdbInitTruncateTblArray(STruncateH *ptru) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru);
static int tsdbInitDeleteTblArray(SDeleteH *pdh) {
STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
ptru->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH));
if (ptru->tblArray == NULL) {
pdh->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableDeleteH));
if (pdh->tblArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
......@@ -429,13 +366,13 @@ static int tsdbInitTruncateTblArray(STruncateH *ptru) {
// Note here must start from 0
for (int i = 0; i < pMeta->maxTables; ++i) {
STableTruncateH tbl = {0};
STableDeleteH tbl = {0};
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
tbl.pTable = pMeta->tables[i];
}
if (taosArrayPush(ptru->tblArray, &tbl) == NULL) {
if (taosArrayPush(pdh->tblArray, &tbl) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
......@@ -446,13 +383,13 @@ static int tsdbInitTruncateTblArray(STruncateH *ptru) {
return 0;
}
static void tsdbDestroyTruncateTblArray(STruncateH *ptru) {
STableTruncateH *pItem = NULL;
static void tsdbDestroyDeleteTblArray(SDeleteH *pdh) {
STableDeleteH *pItem = NULL;
if (ptru->tblArray == NULL) return;
if (pdh->tblArray == NULL) return;
for (size_t i = 0; i < taosArrayGetSize(ptru->tblArray); ++i) {
pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, i);
for (size_t i = 0; i < taosArrayGetSize(pdh->tblArray); ++i) {
pItem = (STableDeleteH *)taosArrayGet(pdh->tblArray, i);
if (pItem->pTable) {
tsdbUnRefTable(pItem->pTable);
}
......@@ -460,19 +397,19 @@ static void tsdbDestroyTruncateTblArray(STruncateH *ptru) {
tfree(pItem->pInfo);
}
ptru->tblArray = taosArrayDestroy(&ptru->tblArray);
pdh->tblArray = taosArrayDestroy(&pdh->tblArray);
}
static int tsdbCacheFSetIndex(STruncateH *ptru) {
SReadH *pReadH = &(ptru->readh);
static int tsdbCacheFSetIndex(SDeleteH *pdh) {
SReadH *pReadH = &(pdh->readh);
if (tsdbLoadBlockIdx(pReadH) < 0) {
return -1;
}
size_t cnt = taosArrayGetSize(ptru->tblArray);
size_t cnt = taosArrayGetSize(pdh->tblArray);
for (size_t tid = 1; tid < cnt; ++tid) {
STableTruncateH *pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, tid);
STableDeleteH *pItem = (STableDeleteH *)taosArrayGet(pdh->tblArray, tid);
pItem->pBlkIdx = NULL;
if (pItem->pTable == NULL)
......@@ -493,30 +430,30 @@ static int tsdbCacheFSetIndex(STruncateH *ptru) {
return 0;
}
static int tsdbFSetInit(STruncateH *ptru, SDFileSet *pSet) {
taosArrayClear(ptru->aBlkIdx);
taosArrayClear(ptru->aSupBlk);
static int tsdbFSetInit(SDeleteH *pdh, SDFileSet *pSet) {
taosArrayClear(pdh->aBlkIdx);
taosArrayClear(pdh->aSupBlk);
// open
if (tsdbSetAndOpenReadFSet(&(ptru->readh), pSet) < 0) {
if (tsdbSetAndOpenReadFSet(&(pdh->readh), pSet) < 0) {
return -1;
}
// load index to cache
if (tsdbCacheFSetIndex(ptru) < 0) {
tsdbCloseAndUnsetFSet(&(ptru->readh));
if (tsdbCacheFSetIndex(pdh) < 0) {
tsdbCloseAndUnsetFSet(&(pdh->readh));
return -1;
}
return 0;
}
static void tsdbTruncateFSetEnd(STruncateH *ptru) { tsdbCloseAndUnsetFSet(&(ptru->readh)); }
static void tsdbDeleteFSetEnd(SDeleteH *pdh) { tsdbCloseAndUnsetFSet(&(pdh->readh)); }
static int32_t tsdbFilterDataCols(STruncateH *ptru, SDataCols *pSrcDCols) {
SDataCols * pDstDCols = ptru->pDCols;
SControlData* pCtlData = &ptru->pCtlInfo->ctlData;
static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) {
SDataCols * pDstDCols = pdh->pDCols;
SControlData* pCtlData = &pdh->pCtlInfo->ctlData;
int32_t delRows = 0;
tdResetDataCols(pDstDCols);
......@@ -542,71 +479,27 @@ static int32_t tsdbFilterDataCols(STruncateH *ptru, SDataCols *pSrcDCols) {
}
// affectedRows
if (ptru->pCtlInfo->pRsp) {
ptru->pCtlInfo->pRsp->affectedRows += delRows;
if (pdh->pCtlInfo->pRsp) {
pdh->pCtlInfo->pRsp->affectedRows += delRows;
}
return 0;
}
// table in delete list
bool tableInDel(STruncateH* ptru, int32_t tid) {
for (int32_t i = 0; i < ptru->pCtlInfo->ctlData.tnum; i++) {
if (tid == ptru->pCtlInfo->ctlData.tids[i])
bool tableInDel(SDeleteH* pdh, int32_t tid) {
for (int32_t i = 0; i < pdh->pCtlInfo->ctlData.tnum; i++) {
if (tid == pdh->pCtlInfo->ctlData.tids[i])
return true;
}
return false;
}
static int tsdbTruncateFSetImpl(STruncateH *ptru) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(ptru);
// SReadH * pReadh = &(ptru->readh);
SBlockIdx * pBlkIdx = NULL;
void ** ppBuf = &(TSDB_TRUNCATE_BUF(ptru));
// void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(ptru));
// void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(ptru));
taosArrayClear(ptru->aBlkIdx);
for (size_t tid = 1; tid < taosArrayGetSize(ptru->tblArray); ++tid) {
STableTruncateH *pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, tid);
pBlkIdx = pItem->pBlkIdx;
if (pItem->pTable == NULL || pItem->pBlkIdx == NULL) continue;
taosArrayClear(ptru->aSupBlk);
if (!tableInDel(ptru, tid)) {
if ((pBlkIdx->numOfBlocks > 0) && (taosArrayPush(ptru->aBlkIdx, (const void *)(pBlkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
} else {
// Loop to mark delete flag for each block data
tsdbDebug("vgId:%d tid %ld matched to truncate", REPO_ID(pRepo), tid);
// for (int i = 0; i < pItem->pBlkIdx->numOfBlocks; ++i) {
// SBlock *pBlock = pItem->pInfo->blocks + i;
// if (tsdbWriteBlockToFile(ptru, pItem->pTable, ptru->pDCols, ppBuf, ppCBuf, ppExBuf) <
// 0) {
// return -1;
// }
// }
}
}
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(ptru), ptru->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
// if pBlock is border block return true else return false
static int tsdbBlockSolve(STruncateH *ptru, SBlock *pBlock) {
static int tsdbBlockSolve(SDeleteH *pdh, SBlock *pBlock) {
// delete window
STimeWindow* pdel = &ptru->pCtlInfo->ctlData.win;
STimeWindow* pdel = &pdh->pCtlInfo->ctlData.win;
// do nothing for no delete
if(pBlock->keyFirst > pdel->ekey || pBlock->keyLast < pdel->skey)
......@@ -621,7 +514,7 @@ static int tsdbBlockSolve(STruncateH *ptru, SBlock *pBlock) {
}
// remove del block from pBlockInfo
int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) {
int tsdbRemoveDelBlocks(SDeleteH *pdh, STableDeleteH * pItem) {
// loop
int numOfBlocks = pItem->pBlkIdx->numOfBlocks;
int from = -1;
......@@ -629,7 +522,7 @@ int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) {
for (int i = numOfBlocks - 1; i >= 0; --i) {
SBlock *pBlock = pItem->pInfo->blocks + i;
int32_t solve = tsdbBlockSolve(ptru, pBlock);
int32_t solve = tsdbBlockSolve(pdh, pBlock);
if (solve == BLOCK_DELETE) {
if (from == -1)
from = i;
......@@ -653,35 +546,35 @@ int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) {
// set value
pItem->pBlkIdx->numOfBlocks = numOfBlocks;
if(ptru->pCtlInfo->pRsp) {
ptru->pCtlInfo->pRsp->affectedRows += delRows;
if(pdh->pCtlInfo->pRsp) {
pdh->pCtlInfo->pRsp->affectedRows += delRows;
}
return TSDB_CODE_SUCCESS;
}
static void tsdbAddBlock(STruncateH *ptru, STableTruncateH *pItem, SBlock *pBlock) {
static void tsdbAddBlock(SDeleteH *pdh, STableDeleteH *pItem, SBlock *pBlock) {
// append sub if have
if (pBlock->numOfSubBlocks > 1) {
int64_t offset = taosArrayGetSize(ptru->aSubBlk) * sizeof(SBlock);
int64_t offset = taosArrayGetSize(pdh->aSubBlk) * sizeof(SBlock);
SBlock *jBlock = POINTER_SHIFT(pItem->pInfo, pBlock->offset);;
for (int j = 0; j < pBlock->numOfSubBlocks; j++) {
taosArrayPush(ptru->aSubBlk, (const void *)jBlock++);
taosArrayPush(pdh->aSubBlk, (const void *)jBlock++);
}
// set new offset if have sub
pBlock->offset = offset;
}
// append super
taosArrayPush(ptru->aSupBlk, (const void *)pBlock);
taosArrayPush(pdh->aSupBlk, (const void *)pBlock);
}
// need modify blocks
static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) {
SReadH * pReadh = &(ptru->readh);
void ** ppBuf = &(TSDB_TRUNCATE_BUF(ptru));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(ptru));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(ptru));
static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
SReadH * pReadh = &(pdh->readh);
void ** ppBuf = &(TSDB_DELETE_BUF(pdh));
void ** ppCBuf = &(TSDB_DELETE_COMP_BUF(pdh));
void ** ppExBuf = &(TSDB_DELETE_EXBUF(pdh));
STSchema *pSchema = NULL;
SBlockIdx blkIdx = {0};
......@@ -690,7 +583,7 @@ static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) {
return -1;
}
if ((tdInitDataCols(ptru->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
if ((tdInitDataCols(pdh->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdFreeSchema(pSchema);
......@@ -699,21 +592,21 @@ static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) {
tdFreeSchema(pSchema);
// delete block
tsdbRemoveDelBlocks(ptru, pItem);
tsdbRemoveDelBlocks(pdh, pItem);
if(pItem->pBlkIdx->numOfBlocks == 0) {
// all blocks were deleted
return TSDB_CODE_SUCCESS;
}
taosArrayClear(ptru->aSupBlk);
taosArrayClear(ptru->aSubBlk);
taosArrayClear(pdh->aSupBlk);
taosArrayClear(pdh->aSubBlk);
// Loop to truncate each block data
for (int i = 0; i < pItem->pBlkIdx->numOfBlocks; ++i) {
SBlock *pBlock = pItem->pInfo->blocks + i;
int32_t solve = tsdbBlockSolve(ptru, pBlock);
int32_t solve = tsdbBlockSolve(pdh, pBlock);
if (solve == BLOCK_READ) {
tsdbAddBlock(ptru, pItem, pBlock);
tsdbAddBlock(pdh, pItem, pBlock);
continue;
}
......@@ -722,28 +615,28 @@ static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) {
return -1;
}
tsdbFilterDataCols(ptru, pReadh->pDCols[0]);
if (ptru->pDCols->numOfRows <= 0) {
tsdbFilterDataCols(pdh, pReadh->pDCols[0]);
if (pdh->pDCols->numOfRows <= 0) {
continue;
}
SBlock newBlock = {0};
if (tsdbWriteBlockToFile(ptru, pItem->pTable, ptru->pDCols, ppBuf, ppCBuf, ppExBuf, &newBlock) < 0) {
if (tsdbWriteBlockToFile(pdh, pItem->pTable, pdh->pDCols, ppBuf, ppCBuf, ppExBuf, &newBlock) < 0) {
return -1;
}
// add new block to info
tsdbAddBlock(ptru, pItem, &newBlock);
tsdbAddBlock(pdh, pItem, &newBlock);
}
// write block info for each table
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(ptru), pItem->pTable, ptru->aSupBlk, ptru->aSubBlk,
if (tsdbWriteBlockInfoImpl(TSDB_DELETE_HEAD_FILE(pdh), pItem->pTable, pdh->aSupBlk, pdh->aSubBlk,
ppBuf, &blkIdx) < 0) {
return -1;
}
// each table's blkIdx
if (blkIdx.numOfBlocks > 0 && taosArrayPush(ptru->aBlkIdx, (const void *)(&blkIdx)) == NULL) {
if (blkIdx.numOfBlocks > 0 && taosArrayPush(pdh->aBlkIdx, (const void *)(&blkIdx)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -752,27 +645,27 @@ static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) {
}
// keep intact blocks info and write to head file then save offset to blkIdx
static int tsdbKeepIntactBlocks(STruncateH *ptru, STableTruncateH * pItem) {
static int tsdbKeepIntactBlocks(SDeleteH *pdh, STableDeleteH * pItem) {
// init
SBlockIdx blkIdx = {0};
taosArrayClear(ptru->aSupBlk);
taosArrayClear(ptru->aSubBlk);
taosArrayClear(pdh->aSupBlk);
taosArrayClear(pdh->aSubBlk);
for (int32_t i = 0; i < pItem->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pItem->pInfo->blocks + i;
tsdbAddBlock(ptru, pItem, pBlock);
tsdbAddBlock(pdh, pItem, pBlock);
}
// write block info for one table
void **ppBuf = &(TSDB_TRUNCATE_BUF(ptru));
int32_t ret = tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(ptru), pItem->pTable, ptru->aSupBlk,
ptru->aSubBlk, ppBuf, &blkIdx);
void **ppBuf = &(TSDB_DELETE_BUF(pdh));
int32_t ret = tsdbWriteBlockInfoImpl(TSDB_DELETE_HEAD_FILE(pdh), pItem->pTable, pdh->aSupBlk,
pdh->aSubBlk, ppBuf, &blkIdx);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
// each table's blkIdx
if (blkIdx.numOfBlocks > 0 && taosArrayPush(ptru->aBlkIdx, (const void *)&blkIdx) == NULL) {
if (blkIdx.numOfBlocks > 0 && taosArrayPush(pdh->aBlkIdx, (const void *)&blkIdx) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
......@@ -780,43 +673,43 @@ static int tsdbKeepIntactBlocks(STruncateH *ptru, STableTruncateH * pItem) {
return ret;
}
static int tsdbFSetDeleteImpl(STruncateH *ptru) {
void ** ppBuf = &(TSDB_TRUNCATE_BUF(ptru));
static int tsdbFSetDeleteImpl(SDeleteH *pdh) {
void ** ppBuf = &(TSDB_DELETE_BUF(pdh));
int32_t ret = TSDB_CODE_SUCCESS;
// 1.INIT
taosArrayClear(ptru->aBlkIdx);
taosArrayClear(pdh->aBlkIdx);
for (size_t tid = 1; tid < taosArrayGetSize(ptru->tblArray); ++tid) {
STableTruncateH *pItem = (STableTruncateH *)taosArrayGet(ptru->tblArray, tid);
for (size_t tid = 1; tid < taosArrayGetSize(pdh->tblArray); ++tid) {
STableDeleteH *pItem = (STableDeleteH *)taosArrayGet(pdh->tblArray, tid);
// no table in this tid position
if (pItem->pTable == NULL || pItem->pBlkIdx == NULL)
continue;
// 2.WRITE INFO OF EACH TABLE BLOCK INFO TO HEAD FILE
if (tableInDel(ptru, tid)) {
if (tableInDel(pdh, tid)) {
// modify blocks info and write to head file then save offset to blkIdx
ret = tsdbModifyBlocks(ptru, pItem);
ret = tsdbModifyBlocks(pdh, pItem);
} else {
// keep intact blocks info and write to head file then save offset to blkIdx
ret = tsdbKeepIntactBlocks(ptru, pItem);
ret = tsdbKeepIntactBlocks(pdh, pItem);
}
if (ret != TSDB_CODE_SUCCESS)
return ret;
} // tid for
// 3.WRITE INDEX OF ALL TABLE'S BLOCK TO HEAD FILE
if (tsdbWriteBlockIdx(TSDB_TRUNCATE_HEAD_FILE(ptru), ptru->aBlkIdx, ppBuf) < 0) {
if (tsdbWriteBlockIdx(TSDB_DELETE_HEAD_FILE(pdh), pdh->aBlkIdx, ppBuf) < 0) {
return -1;
}
return ret;
}
static int tsdbWriteBlockToFile(STruncateH *ptru, STable *pTable, SDataCols *pDCols, void **ppBuf,
static int tsdbWriteBlockToFile(SDeleteH *pdh, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf, SBlock *pBlock) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru);
STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile = NULL;
bool isLast = false;
......@@ -824,15 +717,15 @@ static int tsdbWriteBlockToFile(STruncateH *ptru, STable *pTable, SDataCols *pDC
ASSERT(pDCols->numOfRows > 0);
if (pDCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_TRUNCATE_LAST_FILE(ptru);
pDFile = TSDB_DELETE_LAST_FILE(pdh);
isLast = true;
} else {
pDFile = TSDB_TRUNCATE_DATA_FILE(ptru);
pDFile = TSDB_DELETE_DATA_FILE(pdh);
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
isLast ? TSDB_TRUNCATE_SMAL_FILE(ptru) : TSDB_TRUNCATE_SMAD_FILE(ptru), pDCols,
isLast ? TSDB_DELETE_SMAL_FILE(pdh) : TSDB_DELETE_SMAD_FILE(pdh), pDCols,
pBlock, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册