提交 4d6c9c1d 编写于 作者: C Cary Xu

filter TS data

上级 914df93f
......@@ -29,7 +29,7 @@ typedef struct {
SDFileSet wSet;
SArray * aBlkIdx;
SArray * aSupBlk;
SDataCols *pDataCols;
SDataCols *pDCols;
void * param; // STruncateTblMsg *pMsg
} STruncateH;
......@@ -59,7 +59,8 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param);
static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet);
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH);
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDataCols, void **ppBuf,
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf);
enum {
......@@ -71,6 +72,7 @@ enum {
int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param); }
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
ASSERT(param != NULL);
int32_t code = 0;
// Step 1: check and clear cache
if ((code = tsdbTruncateCache(pRepo, param)) != 0) {
......@@ -180,15 +182,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
STruncateH truncateH = {0};
SDFileSet * pSet = NULL;
STruncateTblMsg *pMsg = (STruncateTblMsg *)param;
ASSERT(pMsg != NULL);
truncateH.param = pMsg;
tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pMsg->uid);
if (tsdbInitTruncateH(&truncateH, pRepo) < 0) {
return -1;
}
truncateH.param = pMsg;
int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision);
......@@ -217,7 +217,6 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) {
continue;
}
#endif
if (tsdbTruncateFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH);
tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
......@@ -307,8 +306,8 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
return -1;
}
pTruncateH->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pTruncateH->pDataCols == NULL) {
pTruncateH->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
if (pTruncateH->pDCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyTruncateH(pTruncateH);
return -1;
......@@ -318,7 +317,7 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) {
}
static void tsdbDestroyTruncateH(STruncateH *pTruncateH) {
pTruncateH->pDataCols = tdFreeDataCols(pTruncateH->pDataCols);
pTruncateH->pDCols = tdFreeDataCols(pTruncateH->pDCols);
pTruncateH->aSupBlk = taosArrayDestroy(pTruncateH->aSupBlk);
pTruncateH->aBlkIdx = taosArrayDestroy(pTruncateH->aBlkIdx);
tsdbDestroyTruncateTblArray(pTruncateH);
......@@ -423,16 +422,56 @@ static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet) {
static void tsdbTruncateFSetEnd(STruncateH *pTruncateH) { tsdbCloseAndUnsetFSet(&(pTruncateH->readh)); }
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock) {
// STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
// for (uint16_t i = 0; i < pMsg->nSpan; ++i) {
// STimeWindow tw = pMsg->span[i];
// if (!(pBlock->keyFirst > tw.ekey || pBlock->keyLast < tw.skey)) {
// return true;
// }
// }
// return false;
return true;
}
static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols) {
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
SDataCols * pDstDCols = pTruncateH->pDCols;
tdResetDataCols(pDstDCols);
pDstDCols->maxCols = pSrcDCols->maxCols;
pDstDCols->maxPoints = pSrcDCols->maxPoints;
pDstDCols->numOfCols = pSrcDCols->numOfCols;
pDstDCols->sversion = pSrcDCols->sversion;
for (int i = 0; i < pSrcDCols->numOfRows; ++i) {
int64_t tsKey = *(int64_t *)tdGetColDataOfRow(pSrcDCols->cols, i);
if ((tsKey >= pMsg->span[0].skey) && (tsKey <= pMsg->span[0].ekey)) {
printf("tsKey %" PRId64 " is filtered\n", tsKey);
continue;
}
for (int j = 0; j < pSrcDCols->numOfCols; ++j) {
if (pSrcDCols->cols[j].len > 0 || pDstDCols->cols[j].len > 0) {
dataColAppendVal(pDstDCols->cols + j, tdGetColDataOfRow(pSrcDCols->cols + j, i), pDstDCols->numOfRows,
pDstDCols->maxPoints, 0);
}
}
++pDstDCols->numOfRows;
}
return 0;
}
static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param;
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
SBlockIdx blkIdx = {0};
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
// STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pTruncateH->readh);
SBlockIdx blkIdx = {0};
void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH));
void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH));
void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH));
// int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pTruncateH->aBlkIdx);
......@@ -448,7 +487,12 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
taosArrayClear(pTruncateH->aSupBlk);
if ((tdInitDataCols(pTruncateH->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
uint64_t uid = pTblHandle->pTable->tableId.uid;
// if(uid != pMsg->uid) {
// TODO: copy the block data directly
// }
if ((tdInitDataCols(pTruncateH->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdFreeSchema(pSchema);
......@@ -461,49 +505,35 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) {
SBlock *pBlock = pTblHandle->pInfo->blocks + i;
// Load the block data
// Copy the Blocks directly if TS is not interleaved.
if (!tsdbBlockInterleaved(pTruncateH, pBlock)) {
// tsdbWriteBlockAndDataToFile();
continue;
}
// Otherwise load the block data and copy the specific rows.
if (tsdbLoadBlockData(pReadh, pBlock, pTblHandle->pInfo) < 0) {
return -1;
}
if (uid == pMsg->uid) {
tsdbFilterDataCols(pTruncateH, pReadh->pDCols[0]);
tsdbDebug("vgId:%d uid %" PRIu64 " matched, filter block data from rows %d to %d rows", REPO_ID(pRepo), uid,
pReadh->pDCols[0]->numOfRows, pTruncateH->pDCols->numOfRows);
if (pTruncateH->pDCols->numOfRows <= 0) continue;
// Merge pTruncateH->pDataCols and pReadh->pDCols[0] and write data to file
if ((pTruncateH->pDataCols->numOfRows == 0) && (pBlock->numOfRows >= defaultRows)) {
// if pTruncateH->pDataCols has no data and pBlock->numOfRows >= defaultRows, write data directly
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
} else {
int ridx = 0;
while (true) {
// no data left to merge anymore
if ((pReadh->pDCols[0]->numOfRows - ridx) == 0) break;
// min(data left, target space left)
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pTruncateH->pDataCols->numOfRows);
tdMergeDataCols(pTruncateH->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx,
pCfg->update != TD_ROW_PARTIAL_UPDATE);
if (pTruncateH->pDataCols->numOfRows < defaultRows) {
// continue to read more blocks
break;
}
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
tdResetDataCols(pTruncateH->pDataCols);
tsdbDebug("vgId:%d uid %" PRIu64 " not matched, copy block data directly\n", REPO_ID(pRepo), uid);
if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
}
}
if (pTruncateH->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL, ppBuf,
&blkIdx) < 0) {
if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL,
ppBuf, &blkIdx) < 0) {
return -1;
}
......@@ -520,7 +550,7 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) {
return 0;
}
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDataCols, void **ppBuf,
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
STsdbCfg * pCfg = REPO_CFG(pRepo);
......@@ -528,9 +558,9 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa
bool isLast = false;
SBlock block = {0};
ASSERT(pDataCols->numOfRows > 0);
ASSERT(pDCols->numOfRows > 0);
if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
if (pDCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH);
isLast = true;
} else {
......@@ -539,7 +569,7 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDataCols,
isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDCols,
&block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
......@@ -550,4 +580,34 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa
}
return 0;
}
\ No newline at end of file
}
// static int tsdbWriteBlockAndDataToFile(STruncateH *pTruncateH, STable *pTable, SBlock *pSupBlock, void **ppBuf,
// void **ppCBuf, void **ppExBuf) {
// STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH);
// SDFile * pDFile = NULL;
// bool isLast = false;
// ASSERT(pSupBlock->numOfRows > 0);
// if (pSupBlock->last) {
// pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH);
// isLast = true;
// } else {
// pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH);
// isLast = false;
// }
// if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
// isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH),
// pDCols, &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
// return -1;
// }
// if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) {
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// return -1;
// }
// return 0;
// }
\ No newline at end of file
......@@ -143,10 +143,10 @@ int32_t vnodeTruncate(STruncateTblMsg *pMsg) {
// not care success or not
STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow));
param->vgId = 2;
param->uid = 562949986978794;
param->uid = 562949986979009;
param->nSpan = 1;
param->span[0].skey = 1637417678000;
param->span[0].ekey = 1637417679000;
param->span[0].skey = 1634701320001;
param->span[0].ekey = 1634701320001;
if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册