提交 9c97e086 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/dnode

......@@ -308,9 +308,11 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
taosFreeQitem(pBlock);
} else if (pTask->sinkType == TASK_SINK__SMA) {
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
taosFreeQitem(pBlock);
} else {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
taosWriteQitem(pTask->outputQueue->queue, pBlock);
......
......@@ -141,6 +141,8 @@ typedef struct SWal {
// ctl
int64_t refId;
TdThreadMutex mutex;
// ref
SHashObj *pRefHash; // ref -> SWalRef
// path
char path[WAL_PATH_LEN];
// reusable write head
......@@ -184,7 +186,7 @@ int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver);
int32_t walEndSnapshot(SWal *);
void walRestoreFromSnapshot(SWal *, int64_t ver);
int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);
// read
......@@ -199,6 +201,16 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalHead *pHead);
typedef struct {
int64_t refId;
int64_t ver;
} SWalRef;
SWalRef *walOpenRef(SWal *);
void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *);
// deprecated
#if 0
int32_t walRead(SWal *, SWalHead **, int64_t ver);
......
......@@ -1773,6 +1773,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
}
// assign data
// TODO
ret = taosMemoryCalloc(1, cap + 46);
ret = POINTER_SHIFT(ret, 46);
ret->header.vgId = vgId;
......
......@@ -132,6 +132,7 @@ static inline void walResetVer(SWalVer* pVer) {
int walLoadMeta(SWal* pWal);
int walSaveMeta(SWal* pWal);
int walRemoveMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal);
int walCheckAndRepairMeta(SWal* pWal);
......
......@@ -419,3 +419,12 @@ int walLoadMeta(SWal* pWal) {
taosMemoryFree(buf);
return code;
}
int walRemoveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) return 0;
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr);
taosRemoveFile(fnameStr);
return 0;
}
......@@ -75,7 +75,7 @@ void walCleanUp() {
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = taosMemoryMalloc(sizeof(SWal));
SWal *pWal = taosMemoryCalloc(1, sizeof(SWal));
if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
......@@ -92,6 +92,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
// init ref
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (pWal->pRefHash == NULL) {
taosMemoryFree(pWal);
return NULL;
}
// open meta
walResetVer(&pWal->vers);
pWal->pWriteLogTFile = NULL;
......@@ -100,6 +107,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
taosHashCleanup(pWal->pRefHash);
taosMemoryFree(pWal);
return NULL;
}
......@@ -115,12 +123,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
taosMemoryFree(pWal);
return NULL;
}
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) {
taosHashCleanup(pWal->pRefHash);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
taosMemoryFree(pWal);
......@@ -130,6 +140,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) {
taosHashCleanup(pWal->pRefHash);
taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
......@@ -175,6 +186,7 @@ void walClose(SWal *pWal) {
walSaveMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
taosHashCleanup(pWal->pRefHash);
taosThreadMutexUnlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId);
......
......@@ -18,12 +18,47 @@
#include "tchecksum.h"
#include "walInt.h"
void walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
/*pWal->vers.firstVer = -1;*/
int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
void *pIter = NULL;
while (1) {
taosHashIterate(pWal->pRefHash, pIter);
if (pIter == NULL) break;
SWalRef *pRef = (SWalRef *)pIter;
if (pRef->ver != -1) {
taosHashCancelIterate(pWal->pRefHash, pIter);
return -1;
}
}
taosCloseFile(&pWal->pWriteLogTFile);
taosCloseFile(&pWal->pWriteIdxTFile);
if (pWal->vers.firstVer != -1) {
int32_t fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int32_t i = 0; i < fileSetSize; i++) {
SWalFileInfo *pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
}
}
walRemoveMeta(pWal);
pWal->writeCur = -1;
pWal->totSize = 0;
pWal->lastRollSeq = -1;
taosArrayClear(pWal->fileInfoSet);
pWal->vers.firstVer = -1;
pWal->vers.lastVer = ver;
pWal->vers.commitVer = ver - 1;
pWal->vers.snapshotVer = ver - 1;
pWal->vers.verInSnapshotting = -1;
taosThreadMutexUnlock(&pWal->mutex);
return 0;
}
int32_t walCommit(SWal *pWal, int64_t ver) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册