未验证 提交 004f1e44 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14753 from taosdata/feature/stream

fix(wal): rollback
......@@ -15,30 +15,37 @@
#include "vnd.h"
const SVnodeCfg vnodeCfgDefault = {
.vgId = -1,
.dbname = "",
.dbId = 0,
.szPage = 4096,
.szCache = 256,
.szBuf = 96 * 1024 * 1024,
.isHeap = false,
.isWeak = 0,
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
.update = 1,
.compression = 2,
.slLevel = 5,
.days = 14400,
.minRows = 100,
.maxRows = 4096,
.keep2 = 5256000,
.keep0 = 5256000,
.keep1 = 5256000},
.walCfg =
{.vgId = -1, .fsyncPeriod = 0, .retentionPeriod = 0, .rollPeriod = 0, .segSize = 0, .level = TAOS_WAL_WRITE},
.hashBegin = 0,
.hashEnd = 0,
.hashMethod = 0};
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
.dbname = "",
.dbId = 0,
.szPage = 4096,
.szCache = 256,
.szBuf = 96 * 1024 * 1024,
.isHeap = false,
.isWeak = 0,
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
.update = 1,
.compression = 2,
.slLevel = 5,
.days = 14400,
.minRows = 100,
.maxRows = 4096,
.keep2 = 5256000,
.keep0 = 5256000,
.keep1 = 5256000},
.walCfg =
{
.vgId = -1,
.fsyncPeriod = 0,
.retentionPeriod = -1,
.rollPeriod = -1,
.segSize = -1,
.retentionSize = -1,
.level = TAOS_WAL_WRITE,
},
.hashBegin = 0,
.hashEnd = 0,
.hashMethod = 0};
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
// TODO
......@@ -79,7 +86,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
SJson *pNodeRetentions = tjsonCreateArray();
tjsonAddItemToObject(pJson, "retentions", pNodeRetentions);
for (int32_t i = 0; i < nRetention; ++i) {
SJson * pNodeRetention = tjsonCreateObject();
SJson *pNodeRetention = tjsonCreateObject();
const SRetention *pRetention = pCfg->tsdbCfg.retentions + i;
tjsonAddIntegerToObject(pNodeRetention, "freq", pRetention->freq);
tjsonAddIntegerToObject(pNodeRetention, "freqUnit", pRetention->freqUnit);
......@@ -156,7 +163,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2, code);
if (code < 0) return -1;
SJson * pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
if (nRetention > TSDB_RETENTION_MAX) {
nRetention = TSDB_RETENTION_MAX;
......
......@@ -230,6 +230,7 @@ int vnodeCommit(SVnode *pVnode) {
ASSERT(0);
return -1;
}
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit
smaPreCommit(pVnode->pSma);
......@@ -278,6 +279,7 @@ int vnodeCommit(SVnode *pVnode) {
smaPostCommit(pVnode->pSma);
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool;
pVnode->pPool = pVnode->onCommit;
......
......@@ -117,6 +117,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
taosRealPath(tdir, NULL, sizeof(tdir));
/*pVnode->config.walCfg.retentionSize = 2000;*/
/*pVnode->config.walCfg.segSize = 200;*/
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) {
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
......
......@@ -208,7 +208,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
qInfo("task %d receive dispatch rsp", pTask->taskId);
qDebug("task %d receive dispatch rsp", pTask->taskId);
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
......@@ -242,7 +242,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
}
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
......
......@@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
}
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qInfo("stream continue dispatching: task %d", pTask->taskId);
qDebug("stream continue dispatching: task %d", pTask->taskId);
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
......
......@@ -141,34 +141,32 @@ int walCheckAndRepairMeta(SWal* pWal) {
regfree(&idxRegPattern);
taosArraySort(pLogInfoArray, compareWalFileInfo);
int oldSz = 0;
if (pWal->fileInfoSet) {
oldSz = taosArrayGetSize(pWal->fileInfoSet);
}
int newSz = taosArrayGetSize(pLogInfoArray);
if (oldSz > newSz) {
taosArrayPopFrontBatch(pWal->fileInfoSet, oldSz - newSz);
} else if (oldSz < newSz) {
for (int i = oldSz; i < newSz; i++) {
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(pLogInfoArray);
if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) {
for (int i = metaFileNum; i < actualFileNum; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo);
}
}
taosArrayDestroy(pLogInfoArray);
pWal->writeCur = newSz - 1;
if (newSz > 0) {
pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, newSz - 1);
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t file_size = 0;
taosStatFile(fnameStr, &file_size, NULL);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
if (oldSz != newSz || pLastFileInfo->fileSize != file_size) {
pLastFileInfo->fileSize = file_size;
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = fileSize;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
......
......@@ -99,7 +99,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// delete files
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int i = pWal->writeCur; i < fileSetSize; i++) {
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
......@@ -113,18 +113,21 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxTFile == NULL) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
if (code < 0) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
// read idx file and get log file pos
SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
......@@ -133,12 +136,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pLogTFile == NULL) {
ASSERT(0);
// TODO
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
if (code < 0) {
ASSERT(0);
// TODO
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......@@ -148,6 +153,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
......@@ -205,15 +211,22 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver) {
pWal->vers.verInSnapshotting = ver;
// check file rolling
if (pWal->cfg.retentionPeriod == 0) {
taosThreadMutexLock(&pWal->mutex);
walRoll(pWal);
taosThreadMutexUnlock(&pWal->mutex);
}
return 0;
}
int32_t walEndSnapshot(SWal *pWal) {
int32_t code = 0;
taosThreadMutexLock(&pWal->mutex);
int64_t ver = pWal->vers.verInSnapshotting;
if (ver == -1) return 0;
if (ver == -1) {
code = -1;
goto END;
};
pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec();
......@@ -229,7 +242,7 @@ int32_t walEndSnapshot(SWal *pWal) {
}
// iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) ||
if ((pWal->cfg.retentionSize != -1 && newTotSize > pWal->cfg.retentionSize) ||
(pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
// delete according to file size or close time
deleteCnt++;
......@@ -259,12 +272,14 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.verInSnapshotting = -1;
// save snapshot ver, commit ver
int code = walSaveMeta(pWal);
code = walSaveMeta(pWal);
if (code < 0) {
return -1;
goto END;
}
return 0;
END:
taosThreadMutexUnlock(&pWal->mutex);
return code;
}
int walRoll(SWal *pWal) {
......@@ -273,14 +288,14 @@ int walRoll(SWal *pWal) {
code = taosCloseFile(&pWal->pWriteIdxTFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
goto END;
}
}
if (pWal->pWriteLogTFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
goto END;
}
}
TdFilePtr pIdxTFile, pLogTFile;
......@@ -291,18 +306,20 @@ int walRoll(SWal *pWal) {
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pIdxTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
code = -1;
goto END;
}
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogTFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
code = -1;
goto END;
}
// terrno set inner
// error code was set inner
code = walRollFileInfo(pWal);
if (code != 0) {
return -1;
goto END;
}
// switch file
......@@ -312,7 +329,9 @@ int walRoll(SWal *pWal) {
ASSERT(pWal->writeCur >= 0);
pWal->lastRollSeq = walGetSeq();
return 0;
END:
return code;
}
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册