提交 2d50a39b 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): finished affected tables count right

上级 fd924682
...@@ -48,6 +48,7 @@ typedef struct { ...@@ -48,6 +48,7 @@ typedef struct {
SDataCols *pDCols; SDataCols *pDCols;
SControlDataInfo* pCtlInfo; SControlDataInfo* pCtlInfo;
SArray * aUpdates; SArray * aUpdates;
SArray * aAffectTables;
} SDeleteH; } SDeleteH;
...@@ -65,7 +66,7 @@ typedef struct { ...@@ -65,7 +66,7 @@ typedef struct {
static void tsdbStartDeleteTrans(STsdbRepo *pRepo); static void tsdbStartDeleteTrans(STsdbRepo *pRepo);
static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno); static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno);
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray); static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray, SArray* pAffectTables);
static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet); static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet);
static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo); static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo);
static void tsdbDestroyDeleteH(SDeleteH *pdh); static void tsdbDestroyDeleteH(SDeleteH *pdh);
...@@ -126,6 +127,9 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -126,6 +127,9 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
return -1; return -1;
} }
SArray* aUpdates = taosArrayInit(10, sizeof(STable *));
SArray* affectedTables = taosArrayInit(10, sizeof(int32_t)); // put tid
// start transaction // start transaction
tsdbStartDeleteTrans(pRepo); tsdbStartDeleteTrans(pRepo);
...@@ -134,8 +138,7 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -134,8 +138,7 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
goto _err; goto _err;
} }
SArray* aUpdates = taosArrayInit(10, sizeof(STable *)); if (tsdbDeleteTSData(pRepo, pCtlInfo, aUpdates, affectedTables) < 0) {
if (tsdbDeleteTSData(pRepo, pCtlInfo, aUpdates) < 0) {
tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; goto _err;
} }
...@@ -145,18 +148,20 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -145,18 +148,20 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
// set affected tables number // set affected tables number
if(pCtlInfo->pRsp) { if(pCtlInfo->pRsp) {
pCtlInfo->pRsp->numOfTables = pCtlInfo->tnum; pCtlInfo->pRsp->numOfTables = (int32_t)taosArrayGetSize(affectedTables);
} }
// update last row // update last row
tsdbUpdateLastRow(pRepo, aUpdates); tsdbUpdateLastRow(pRepo, aUpdates);
tsdbClearUpdates(aUpdates); tsdbClearUpdates(aUpdates);
taosArrayDestroy(&affectedTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
pRepo->code = terrno; pRepo->code = terrno;
tsdbEndDeleteTrans(pRepo, terrno); tsdbEndDeleteTrans(pRepo, terrno);
tsdbClearUpdates(aUpdates); tsdbClearUpdates(aUpdates);
taosArrayDestroy(&affectedTables);
return -1; return -1;
} }
...@@ -179,7 +184,7 @@ static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno) { ...@@ -179,7 +184,7 @@ static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno) {
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} }
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray) { static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray, SArray* pAffectTables) {
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SDeleteH deleteH = {0}; SDeleteH deleteH = {0};
SDFileSet * pSet = NULL; SDFileSet * pSet = NULL;
...@@ -193,6 +198,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray ...@@ -193,6 +198,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray
deleteH.aUpdates = pArray; deleteH.aUpdates = pArray;
deleteH.pCtlInfo = pCtlInfo; deleteH.pCtlInfo = pCtlInfo;
STimeWindow win = pCtlInfo->win; STimeWindow win = pCtlInfo->win;
deleteH.aAffectTables = pAffectTables;
int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision); int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision);
...@@ -371,6 +377,18 @@ void tsdbAddUpdates(SArray* pArray, STable* pTable) { ...@@ -371,6 +377,18 @@ void tsdbAddUpdates(SArray* pArray, STable* pTable) {
taosArrayAddBatch(pArray, &pTable, 1); taosArrayAddBatch(pArray, &pTable, 1);
} }
void tsdbAddAffectTables(SArray* pArray, int32_t tid) {
size_t cnt = taosArrayGetSize(pArray);
for ( size_t i = 0; i < cnt; i++) {
int32_t tid1 = *(int32_t *)taosArrayGet(pArray, i);
if ( tid1 == tid) {
// exist return
return ;
}
}
// append
taosArrayAddBatch(pArray, &tid, 1);
}
// init tbl array with pRepo->meta // init tbl array with pRepo->meta
static int tsdbInitDeleteTblArray(SDeleteH *pdh) { static int tsdbInitDeleteTblArray(SDeleteH *pdh) {
STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh); STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh);
...@@ -499,12 +517,7 @@ static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) { ...@@ -499,12 +517,7 @@ static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) {
++ pDstDCols->numOfRows; ++ pDstDCols->numOfRows;
} }
// affectedRows return delRows;
if (pdh->pCtlInfo->pRsp) {
pdh->pCtlInfo->pRsp->affectedRows += delRows;
}
return 0;
} }
// table in delete list // table in delete list
...@@ -565,11 +578,17 @@ int tsdbRemoveDelBlocks(SDeleteH *pdh, STableDeleteH * pItem) { ...@@ -565,11 +578,17 @@ int tsdbRemoveDelBlocks(SDeleteH *pdh, STableDeleteH * pItem) {
numOfBlocks -= delCnt; numOfBlocks -= delCnt;
} }
// set value // set current blocks num
pItem->pBlkIdx->numOfBlocks = numOfBlocks; pItem->pBlkIdx->numOfBlocks = numOfBlocks;
if(delRows > 0) {
// affected Rows
if(pdh->pCtlInfo->pRsp) { if(pdh->pCtlInfo->pRsp) {
pdh->pCtlInfo->pRsp->affectedRows += delRows; pdh->pCtlInfo->pRsp->affectedRows += delRows;
} }
// affected Tables
tsdbAddAffectTables(pdh->aAffectTables, pItem->pTable->tableId.tid);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -622,6 +641,8 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { ...@@ -622,6 +641,8 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
taosArrayClear(pdh->aSupBlk); taosArrayClear(pdh->aSupBlk);
taosArrayClear(pdh->aSubBlk); taosArrayClear(pdh->aSubBlk);
int32_t affectedRows = 0;
// Loop to truncate each block data // Loop to truncate each block data
for (int i = 0; i < pItem->pBlkIdx->numOfBlocks; ++i) { for (int i = 0; i < pItem->pBlkIdx->numOfBlocks; ++i) {
SBlock *pBlock = pItem->pInfo->blocks + i; SBlock *pBlock = pItem->pInfo->blocks + i;
...@@ -636,7 +657,7 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { ...@@ -636,7 +657,7 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
return -1; return -1;
} }
tsdbFilterDataCols(pdh, pReadh->pDCols[0]); affectedRows += tsdbFilterDataCols(pdh, pReadh->pDCols[0]);
if (pdh->pDCols->numOfRows <= 0) { if (pdh->pDCols->numOfRows <= 0) {
continue; continue;
} }
...@@ -669,6 +690,15 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) { ...@@ -669,6 +690,15 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
tsdbAddUpdates(pdh->aUpdates, pItem->pTable); tsdbAddUpdates(pdh->aUpdates, pItem->pTable);
} }
if (affectedRows > 0) {
// affectedRows
if (pdh->pCtlInfo->pRsp) {
pdh->pCtlInfo->pRsp->affectedRows += affectedRows;
}
// affectTables
tsdbAddAffectTables(pdh->aAffectTables, pItem->pTable->tableId.tid);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册