提交 876c0068 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): fixed last_row() return empty.

上级 3a56d8a8
......@@ -58,13 +58,11 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
tscHandleSubDeleteError(param, tres, code);
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
tfree(pSql->param);
return;
}
......@@ -76,42 +74,27 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) {
* NOTE: thread safe is required.
*/
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(code == taos_errno(pSql));
int32_t sent = 0;
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
//tscReissueSubquery(trsupport, pSql, code, &sent);
if (sent) {
return;
}
} else {
tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
}
tscError(":CDEL 0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
tscHandleSubDeleteError(param, tres, pParentSql->res.code);
if(!sent) {
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
tfree(pSql->param);
return;
}
tscDebug("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql->self,
pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
// do merge
// success do total count
pParentSql->res.numOfRows += pSql->res.numOfRows;
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
tfree(pSql->param);
return ;
}
......@@ -206,7 +189,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
pRes->qId = 0x1; // hack the qhandle check
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
......@@ -237,7 +219,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SSqlObj *pNew = tscCreateSTableSubDelete(pSql, pVgroupMsg, trs);
if (pNew == NULL) {
tscError("0x%"PRIx64"CDEL failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs->localBuffer);
tfree(trs);
break;
}
......
......@@ -116,9 +116,10 @@ STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo);
UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable, bool lastKey);
void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]);
int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx, bool onlyKey);
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
ASSERT(pRepo != NULL);
......
......@@ -32,9 +32,11 @@ typedef struct {
SBlockIdx * pBlkIdx;
SBlockIdx bIndex;
SBlockInfo *pInfo;
bool update; // need update lastrow
} STableDeleteH;
typedef struct {
STsdbRepo *pRepo;
SRtn rtn;
SFSIter fsIter;
SArray * tblArray; // STableDeleteH, table array to cache table obj and block indexes
......@@ -45,6 +47,7 @@ typedef struct {
SArray * aSubBlk;
SDataCols *pDCols;
SControlDataInfo* pCtlInfo;
SArray * aUpdates;
} SDeleteH;
......@@ -63,7 +66,7 @@ typedef struct {
static void tsdbStartDelete(STsdbRepo *pRepo);
static void tsdbEndDelete(STsdbRepo *pRepo, int eno);
static int tsdbDeleteMeta(STsdbRepo *pRepo);
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo);
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray);
static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet);
static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo);
static void tsdbDestroyDeleteH(SDeleteH *pdh);
......@@ -91,6 +94,24 @@ int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) {
return ret;
}
static void tsdbUpdateLastRow(STsdbRepo* pRepo, SArray * pArray) {
size_t cnt = taosArrayGetSize(pArray);
for (size_t i = 0; i < cnt; ++i) {
STable* pTable = taosArrayGetP(pArray, i);
tsdbLoadLastCache(pRepo, pTable, true);
}
}
static void tsdbClearUpdates(SArray * pArray) {
size_t cnt = taosArrayGetSize(pArray);
for (size_t i = 0; i < cnt; ++i) {
STable* pTable = taosArrayGetP(pArray, i);
tsdbUnRefTable(pTable);
}
// destory
taosArrayDestroy(&pArray);
}
static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
int32_t code = 0;
// Step 1: check and clear cache
......@@ -117,17 +138,23 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
goto _err;
}
if (tsdbDeleteTSData(pRepo, pCtlInfo) < 0) {
SArray* aUpdates = taosArrayInit(10, sizeof(STable *));
if (tsdbDeleteTSData(pRepo, pCtlInfo, aUpdates) < 0) {
tsdbError("vgId:%d failed to truncate TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
tsdbEndDelete(pRepo, TSDB_CODE_SUCCESS);
// update last row
tsdbUpdateLastRow(pRepo, aUpdates);
tsdbClearUpdates(aUpdates);
return TSDB_CODE_SUCCESS;
_err:
pRepo->code = terrno;
tsdbEndDelete(pRepo, terrno);
tsdbClearUpdates(aUpdates);
return -1;
}
......@@ -174,27 +201,28 @@ static int tsdbDeleteMeta(STsdbRepo *pRepo) {
return 0;
}
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDeleteH truncateH = {0};
SDeleteH deleteH = {0};
SDFileSet * pSet = NULL;
tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->tids[0]);
if (tsdbInitDeleteH(&truncateH, pRepo) < 0) {
if (tsdbInitDeleteH(&deleteH, pRepo) < 0) {
return -1;
}
truncateH.pCtlInfo = pCtlInfo;
STimeWindow win = pCtlInfo->win;
deleteH.aUpdates = pArray;
deleteH.pCtlInfo = pCtlInfo;
STimeWindow win = pCtlInfo->win;
int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision);
ASSERT(sFid <= eFid);
while ((pSet = tsdbFSIterNext(&(truncateH.fsIter)))) {
while ((pSet = tsdbFSIterNext(&(deleteH.fsIter)))) {
// remove expired files
if (pSet->fid < truncateH.rtn.minFid) {
if (pSet->fid < deleteH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue;
......@@ -202,7 +230,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
if ((pSet->fid < sFid) || (pSet->fid > eFid)) {
tsdbDebug("vgId:%d no need to truncate FSET %d, sFid %d, eFid %d", REPO_ID(pRepo), pSet->fid, sFid, eFid);
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(truncateH.rtn)) < 0) {
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(deleteH.rtn)) < 0) {
return -1;
}
continue;
......@@ -217,8 +245,8 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
#endif
if (pCtlInfo->command & CMD_DELETE_DATA) {
if (tsdbFSetDelete(&truncateH, pSet) < 0) {
tsdbDestroyDeleteH(&truncateH);
if (tsdbFSetDelete(&deleteH, pSet) < 0) {
tsdbDestroyDeleteH(&deleteH);
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
......@@ -228,7 +256,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
}
tsdbDestroyDeleteH(&truncateH);
tsdbDestroyDeleteH(&deleteH);
tsdbDebug("vgId:%d truncate TS data over", REPO_ID(pRepo));
return 0;
}
......@@ -295,6 +323,7 @@ static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo) {
memset(pdh, 0, sizeof(*pdh));
TSDB_FSET_SET_CLOSED(TSDB_DELETE_WSET(pdh));
pdh->pRepo = pRepo;
tsdbGetRtnSnap(pRepo, &(pdh->rtn));
tsdbFSIterInit(&(pdh->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
......@@ -349,6 +378,21 @@ static void tsdbDestroyDeleteH(SDeleteH *pdh) {
tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh));
}
void tsdbAddUpdates(SArray* pArray, STable* pTable) {
size_t cnt = taosArrayGetSize(pArray);
for ( size_t i = 0; i < cnt; i++) {
STable* pt = taosArrayGetP(pArray, i);
if ( pt == pTable) {
// found
return ;
}
}
// ref count ++
tsdbRefTable(pTable);
// append
taosArrayAddBatch(pArray, &pTable, 1);
}
// init tbl array with pRepo->meta
static int tsdbInitDeleteTblArray(SDeleteH *pdh) {
STsdbRepo *pRepo = TSDB_DELETE_REPO(pdh);
......@@ -640,7 +684,14 @@ static int tsdbModifyBlocks(SDeleteH *pdh, STableDeleteH *pItem) {
return -1;
}
return 0;
// update new last row in last row was deleted
TSKEY lastKey = pItem->pTable->lastKey;
if(lastKey >= pdh->pCtlInfo->win.skey && lastKey <= pdh->pCtlInfo->win.ekey) {
// update lastkey and lastrow
tsdbAddUpdates(pdh->aUpdates, pItem->pTable);
}
return TSDB_CODE_SUCCESS;
}
// keep intact blocks info and write to head file then save offset to blkIdx
......
......@@ -30,7 +30,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
// Function declaration
int32_t tsdbCreateRepo(int repoid) {
......@@ -814,8 +813,7 @@ out:
return err;
}
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL);
int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx, bool onlyKey) {
if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) {
return -1;
}
......@@ -844,15 +842,22 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
}
TSKEY lastKey = memRowKey(lastRow);
// during the load data in file, new data would be inserted and last row has been updated
TSDB_WLOCK_TABLE(pTable);
if (pTable->lastRow == NULL) {
if (onlyKey) {
pTable->lastKey = lastKey;
pTable->lastRow = lastRow;
TSDB_WUNLOCK_TABLE(pTable);
} else {
TSDB_WUNLOCK_TABLE(pTable);
// set
if (pTable->lastRow == NULL) {
pTable->lastKey = lastKey;
pTable->lastRow = lastRow;
lastRow = NULL;
}
}
TSDB_WUNLOCK_TABLE(pTable);
if (lastRow) {
taosTZfree(lastRow);
}
......@@ -908,7 +913,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
if (pIdx && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey;
if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, false) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
......@@ -933,10 +938,11 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable, bool lastKey) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
bool onlyKey = false;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
......@@ -956,14 +962,19 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
if(!lastKey)
return 0;
onlyKey = true;
}
cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0;
cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0;
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) {
return 0;
if (!lastKey)
return 0;
onlyKey = true;
cacheLastRowTableNum = 1;
}
if (tsdbInitReadH(&readh, pRepo) < 0) {
......@@ -997,7 +1008,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, onlyKey) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
......@@ -1116,7 +1127,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
pTable->lastKey = pIdx->maxKey;
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx, false) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
......
......@@ -635,7 +635,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
code = tsdbLoadLastCache(pRepo, pTable);
code = tsdbLoadLastCache(pRepo, pTable, false);
if (code != 0) {
tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid,
pTable->tableId.tid, tstrerror(terrno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册