提交 67463396 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

# Conflicts:
#	source/libs/stream/src/streamSnapshot.c
......@@ -14,13 +14,13 @@
*/
#include "os.h"
#include "ttimer.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "ttimer.h"
#ifdef __cplusplus
extern "C" {
......@@ -391,10 +391,12 @@ typedef struct SStreamMeta {
SMgmtInfo mgmtInfo;
int32_t chkptNotReadyTasks;
SArray* checkpointSaved;
SArray* checkpointInUse;
int32_t checkpointCap;
SRWLatch checkpointDirLock;
int64_t chkpId;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......@@ -420,7 +422,7 @@ typedef struct {
typedef struct {
int32_t type;
int64_t stage; //nodeId from upstream task
int64_t stage; // nodeId from upstream task
int64_t streamId;
int32_t taskId;
int32_t srcVgId;
......@@ -696,6 +698,9 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
......
......@@ -200,8 +200,11 @@ void taosArrayClear(SArray* pArray);
* @param pArray
* @param fp
*/
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
void taosArrayClearP(SArray* pArray, void (*fp)(void*));
void* taosArrayDestroy(SArray* pArray);
void taosArrayDestroyP(SArray* pArray, FDelete fp);
......
......@@ -48,6 +48,7 @@ typedef struct SStreamVnodeRevertIndex {
static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList;
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
......@@ -1063,7 +1064,6 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
}
pStream->checkpointFreq = checkpointId;
pStream->checkpointId = checkpointId;
pStream->checkpointFreq = taosGetTimestampMs();
atomic_store_64(&pStream->currentTick, 0);
......@@ -1100,7 +1100,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
......
......@@ -330,6 +330,10 @@ int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData);
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter);
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback);
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId);
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter);
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================
......
......@@ -41,13 +41,17 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SStreamMeta* meta = pTq->pStreamMeta;
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
int64_t chkpId = meta ? meta->chkpId : 0;
SStreamSnapReader* pSnapReader = NULL;
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints");
if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) {
if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) {
pReader->complete = 1;
} else {
code = -1;
......@@ -131,14 +135,18 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pWriter->sver = sver;
pWriter->ever = ever;
sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM);
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "received");
taosMkDir(tdir);
SStreamSnapWriter* pSnapWriter = NULL;
if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) {
goto _err;
}
tqDebug("vgId:%d, vnode stream-state snapshot writer opened", TD_VID(pTq->pVnode));
tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir);
pWriter->pWriterImpl = pSnapWriter;
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code));
......@@ -151,13 +159,21 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
int32_t code = 0;
tqDebug("vgId:%d, vnode stream-state snapshot writer closed", TD_VID(pWriter->pTq->pVnode));
code = streamSnapWriterClose(pWriter->pWriterImpl, rollback);
return code;
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
if (code == 0) {
code = streamStateLoadTasks(pWriter);
}
taosMemoryFree(pWriter);
return code;
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode));
code = streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
return code;
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
}
......@@ -248,31 +248,30 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
}
}
// if (!pReader->streamStateDone) {
// if (pReader->pStreamStateReader == NULL) {
// code =
// streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver,
// &pReader->pStreamStateReader);
// if (code) {
// pReader->streamStateDone = 1;
// pReader->pStreamStateReader = NULL;
// goto _err;
// }
// }
// code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
// if (code) {
// goto _err;
// } else {
// if (*ppData) {
// goto _exit;
// } else {
// pReader->streamStateDone = 1;
// code = streamStateSnapReaderClose(pReader->pStreamStateReader);
// if (code) goto _err;
// pReader->pStreamStateReader = NULL;
// }
// }
// }
if (!pReader->streamStateDone) {
if (pReader->pStreamStateReader == NULL) {
code =
streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
if (code) {
pReader->streamStateDone = 1;
pReader->pStreamStateReader = NULL;
goto _err;
}
}
code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->streamStateDone = 1;
code = streamStateSnapReaderClose(pReader->pStreamStateReader);
if (code) goto _err;
pReader->pStreamStateReader = NULL;
}
}
}
// RSMA ==============
if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
......@@ -419,6 +418,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit;
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
pWriter->pStreamStateWriter = NULL;
if (code) goto _exit;
}
if (pWriter->pRsmaSnapWriter) {
......@@ -527,7 +530,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_STREAM_STATE: {
case SNAP_DATA_STREAM_STATE_BACKEND: {
if (pWriter->pStreamStateWriter == NULL) {
code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
if (code) goto _err;
......
......@@ -48,22 +48,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return NULL;
}
int32_t len = strlen(path) + 20;
char* streamPath = taosMemoryCalloc(1, len);
sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err;
}
char* tpath = taosMemoryCalloc(1, strlen(path) + 64);
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
}
......@@ -100,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointCap = 8;
taosInitRWLatch(&pMeta->checkpointDirLock);
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpCap = 8;
taosInitRWLatch(&pMeta->chkpDirLock);
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->chkpId = chkpId;
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
if (pMeta->streamBackend == NULL) {
......@@ -118,9 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
......@@ -128,7 +117,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
return pMeta;
_err:
taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
......@@ -141,6 +129,66 @@ _err:
return NULL;
}
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// stop all running tasking and reopen later
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->schedTimer) {
taosTmrStop(pTask->schedTimer);
pTask->schedTimer = NULL;
}
if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
tFreeStreamTask(pTask);
}
// close stream backend
// streamBackendCleanup(pMeta->streamBackend);
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(defaultPath);
char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
if (taosRenameFile(newPath, defaultPath) < 0) {
taosMemoryFree(defaultPath);
taosMemoryFree(newPath);
return -1;
}
pMeta->streamBackend = streamBackendInit(pMeta->path, 0);
if (pMeta->streamBackend == NULL) {
return -1;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosHashClear(pMeta->pTasks);
taosArrayClear(pMeta->pTaskList);
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->chkpInUse);
return 0;
}
void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta");
if (pMeta == NULL) {
......@@ -168,8 +216,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosThreadMutexDestroy(&pMeta->backendMutex);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosArrayDestroy(pMeta->checkpointSaved);
taosArrayDestroy(pMeta->checkpointInUse);
taosArrayDestroy(pMeta->chkpSaved);
taosArrayDestroy(pMeta->chkpInUse);
taosMemoryFree(pMeta);
qDebug("end to close stream meta");
......
......@@ -64,6 +64,7 @@ struct SStreamSnapReader {
int64_t sver;
int64_t ever;
SStreamSnapHandle handle;
int64_t checkpointId;
};
struct SStreamSnapWriter {
void* pMeta;
......@@ -78,31 +79,60 @@ const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path);
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname)
#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \
do { \
sprintf(fullname, "%s/%s", path, file); \
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
} while (0)
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int ret = 0;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = taosStatFile(fullname, sz, NULL);
taosMemoryFree(fullname);
return ret;
}
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
return taosOpenFile(fullname, opt);
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) {
// impl later
int len = strlen(path);
char* tdir = taosMemoryCalloc(1, len + 128);
memcpy(tdir, path, len);
if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId);
} else {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
}
int32_t code = 0;
TdDirPtr pDir = taosOpenDir(path);
TdDirPtr pDir = taosOpenDir(tdir);
if (NULL == pDir) {
qError("stream-state failed to open %s", tdir);
goto _err;
}
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pHandle->pBackendFile = pFile;
pHandle->checkpointId = 0;
pHandle->checkpointId = chkpId;
pHandle->seraial = 0;
pFile->path = taosStrdup(path);
pFile->path = tdir;
pFile->pSst = taosArrayInit(16, sizeof(void*));
TdDirEntryPtr pDirEntry;
......@@ -117,7 +147,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
continue;
}
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
pFile->pMainfest = taosStrdup(name);
pFile->pOptions = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
......@@ -134,7 +164,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) {
qError("stream-state failed to open %s, reason: no valid file", tdir);
code = -1;
tdir = NULL;
goto _err;
}
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
......@@ -143,50 +175,45 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
// current
item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
taosStatFile(pFile->pCurrent, &item.size, NULL, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// mainfest
item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
taosStatFile(pFile->pMainfest, &item.size, NULL, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// options
item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
taosStatFile(pFile->pOptions, &item.size, NULL, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
taosStatFile(sst, &item.size, NULL, NULL);
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
}
// meta
item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
taosStatFile(pFile->pCheckpointMeta, &item.size, NULL, NULL);
if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) {
taosArrayPush(list, &item);
}
pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0;
pHandle->pFileList = list;
char fullname[256] = {0};
char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
if (pHandle->fd == NULL) {
goto _err;
}
pHandle->seraial = 0;
pHandle->offset = 0;
return 0;
_err:
streamSnapHandleDestroy(pHandle);
taosMemoryFreeClear(tdir);
code = -1;
return code;
......@@ -194,6 +221,7 @@ _err:
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile;
if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest);
......@@ -205,20 +233,21 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
}
taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile);
}
taosArrayDestroy(handle->pFileList);
taosCloseFile(&handle->fd);
return;
}
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) {
int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* path, SStreamSnapReader** ppReader) {
// impl later
SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader));
if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) {
taosMemoryFree(pReader);
return -1;
}
......@@ -242,17 +271,32 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (pHandle->fd == NULL) {
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
*ppData = NULL;
*size = 0;
return 0;
} else {
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
}
qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to read snap, file name:%s, type:%d, reason:%s", item->name, item->type,
qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type,
tstrerror(code));
return code;
// handle later
return -1;
} else if (nread > 0 && nread <= kBlockSize) {
// left bytes less than kBlockSize
qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name,
(int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd);
......@@ -260,6 +304,11 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->currFileIdx += 1;
}
} else {
qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx);
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
if (pHandle->currFileIdx >= taosArrayGetSize(pHandle->pFileList)) {
// finish
*ppData = NULL;
......@@ -267,12 +316,13 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return 0;
}
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ);
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
pHandle->offset += nread;
qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d",
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
......@@ -310,7 +360,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
pHandle->pFileList = list;
pHandle->currFileIdx = 0;
pHandle->offset = 0;
pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE);
*ppWriter = pWriter;
return 0;
}
......@@ -321,14 +371,25 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx);
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (pHandle->fd == NULL) {
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
}
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) {
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
return code;
}
pHandle->offset += pHdr->size;
pHandle->offset += bytes;
} else {
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
......@@ -339,10 +400,13 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
item.type = pHdr->type;
taosArrayPush(pHandle->pFileList, &item);
char fullname[256] = {0};
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name;
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name,
tstrerror(code));
}
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
pHandle->offset += pHdr->size;
......@@ -367,6 +431,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
qDebug("stream snap get file list, %s", buf);
taosMemoryFree(buf);
}
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
taosMemoryFree(item->name);
......
......@@ -108,7 +108,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
SStreamTask* pStreamTask = pTask;
char statePath[1024];
if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId);
sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
} else {
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);
......
......@@ -804,7 +804,7 @@ TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) {
// sprintf(&key[count - 2], "%c", i);
key[count - 2] = '0' + i;
ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn);
ret = tdbTbInsert(pDb, key, count, NULL, 0, txn);
GTEST_ASSERT_EQ(ret, 0);
}
}
......
......@@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray->size = 0;
}
void taosArrayClearP(SArray* pArray, void (*fp)(void*)) {
// if (pArray == NULL) return;
// if (fp == NULL) {
// pArray->size = 0;
// return;
// }
// for (int32_t i = 0; i < pArray->size; ++i) {
// fp(TARRAY_GET_ELEM(pArray, i));
// }
if (pArray) {
for (int32_t i = 0; i < pArray->size; i++) {
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
}
}
taosArrayClear(pArray);
}
void* taosArrayDestroy(SArray* pArray) {
if (pArray) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册