提交 2c1fc501 编写于 作者: dengyihao's avatar dengyihao

add checkpoint

上级 9b451bc5
......@@ -81,7 +81,14 @@ static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
// static void streamBuildFname(char* path, char* file, char* fullname)
#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \
do { \
sprintf(fullname, "%s/%s", path, file); \
} while (0)
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) {
// impl later
int32_t code = 0;
......@@ -91,8 +98,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
}
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
handle->checkpointId = 0;
handle->seraial = 0;
pHandle->checkpointId = 0;
pHandle->seraial = 0;
pFile->path = taosStrdup(path);
pFile->pSst = taosArrayInit(16, sizeof(void*));
......@@ -156,19 +163,24 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
taosStatFile(pFile->pCheckpointMeta, &item.size, NULL);
taosArrayPush(list, &item);
handle->pBackendFile = pFile;
pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0;
pHandle->pFileList = list;
handle->currFileIdx = 0;
handle->pFileList = list;
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_READ);
if (handle->fd == NULL) {
char fullname[256] = {0};
char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
if (pHandle->fd == NULL) {
goto _err;
}
handle->seraial = 0;
handle->offset = 0;
pHandle->seraial = 0;
pHandle->offset = 0;
return 0;
_err:
streamSnapHandleDestroy(handle);
streamSnapHandleDestroy(pHandle);
code = -1;
return code;
......@@ -219,7 +231,9 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
// impl later
int32_t code = 0;
SStreamSnapHandle* pHandle = &pReader->handle;
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
......@@ -234,6 +248,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
pHandle->offset += nread;
if (pHandle->offset >= item->size || nread < kBlockSize) {
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
}
} else {
......@@ -242,7 +257,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
return 0;
}
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = taosOpenFile(item->name, TD_FILE_READ);
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_READ);
nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
pHandle->offset += nread;
}
......@@ -266,7 +284,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna
if (pWriter == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SStreamSnapHandle* handle = &pWriter->handle;
SStreamSnapHandle* pHandle = &pWriter->handle;
const char* path = NULL;
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
......@@ -278,12 +296,12 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(list, &item);
handle->pBackendFile = pFile;
pHandle->pBackendFile = pFile;
handle->pFileList = list;
handle->currFileIdx = 0;
handle->offset = 0;
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE);
pHandle->pFileList = list;
pHandle->currFileIdx = 0;
pHandle->offset = 0;
pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE);
*ppWriter = pWriter;
return 0;
}
......@@ -292,29 +310,33 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
int32_t code = 0;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* handle = &pWriter->handle;
SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx);
SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile;
SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx);
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
if (taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset) != pHdr->size) {
if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno);
qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code));
return code;
}
handle->offset += pHdr->size;
pHandle->offset += pHdr->size;
} else {
taosCloseFile(&handle->fd);
taosCloseFile(&pHandle->fd);
pHandle->offset = 0;
pHandle->currFileIdx += 1;
SBackendFileItem item;
item.name = taosStrdup(pHdr->name);
item.type = pHdr->type;
taosArrayPush(handle->pFileList, &item);
taosArrayPush(pHandle->pFileList, &item);
handle->offset = 0;
handle->currFileIdx += 1;
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE);
char fullname[256] = {0};
char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name;
STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname);
pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE);
taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset);
handle->offset += pHdr->size;
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset);
pHandle->offset += pHdr->size;
}
// impl later
......@@ -322,6 +344,20 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle;
if (qDebugFlag & DEBUG_DEBUG) {
char* buf = (char*)taosMemoryMalloc(1024);
int n = sprintf(buf, "[");
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
if (i != taosArrayGetSize(handle->pFileList) - 1) {
n += sprintf(buf + n, "%s %" PRId64 ",", item->name, item->size);
} else {
n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size);
}
}
qDebug("stream snap get file list, %s", buf);
taosMemoryFree(buf);
}
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
taosMemoryFree(item->name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册