提交 0330f3b2 编写于 作者: Y yihaoDeng

add checkpoint

上级 e36de5ed
......@@ -29,6 +29,6 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
// SMetaSnapWriter ========================================
int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSnapWriter** ppWriter);
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamSnapWriterClose(SStreamSnapWriter** ppWriter, int8_t rollback);
int32_t streamSnapWriterClose(SStreamSnapWriter* ppWriter, int8_t rollback);
#endif
\ No newline at end of file
......@@ -152,7 +152,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) {
handle->currFileIdx = 0;
handle->pFileList = list;
handle->fd = taosOpenFile(taosArrayGetP(handle->pFileList, handle->currFileIdx), TD_FILE_READ);
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_READ);
if (handle->fd == NULL) {
goto _err;
}
......@@ -213,7 +213,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosReadFile(pHandle->fd, buf, kBlockSize);
int64_t nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize);
if (nread == -1) {
// handle later
return -1;
......@@ -227,7 +227,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
pHandle->fd = taosOpenFile(item->name, TD_FILE_READ);
// handle err later
nread = taosReadFile(pHandle->fd, buf, kBlockSize);
nread = taosReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize);
}
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf;
......@@ -248,47 +248,58 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna
if (pWriter == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
const char* path = NULL;
// if (streamSnapHandleInit(&pWriter->handle, (char*)path) < 0) {
// return -1;
// }
SStreamSnapHandle* handle = &pWriter->handle;
const char* path = NULL;
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pFile->path = taosStrdup(path);
pFile->pSst = taosArrayInit(16, sizeof(void*));
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
// SBackendFileItem item;
// // current
// item.name = (char*)ROCKSDB_CURRENT;
// item.type = ROCKSDB_CURRENT_TYPE;
// taosArrayPush(list, &item);
// // mainfest
// item.name = pFile->pMainfest;
// item.type = ROCKSDB_MAINFEST_TYPE;
// taosArrayPush(list, &item);
// // options
// item.name = pFile->pOptions;
// item.type = ROCKSDB_OPTIONS_TYPE;
// taosArrayPush(list, &item);
// // sst
// for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
// char* sst = taosArrayGetP(pFile->pSst, i);
// item.name = sst;
// item.type = ROCKSDB_SST_TYPE;
// taosArrayPush(list, &item);
// }
// // meta
// item.name = pFile->pCheckpointMeta;
// item.type = ROCKSDB_CHECKPOINT_META_TYPE;
// taosArrayPush(list, &item);
SBackendFileItem item;
item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(list, &item);
handle->pBackendFile = pFile;
handle->pFileList = list;
handle->currFileIdx = 0;
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE);
*ppWriter = pWriter;
return 0;
}
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* handle = &pWriter->handle;
SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx);
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
taosWriteFile(handle->fd, pHdr->data, pHdr->size);
} else {
taosCloseFile(&handle->fd);
SBackendFileItem item;
item.name = taosStrdup(pHdr->name);
item.type = pHdr->type;
taosArrayPush(handle->pFileList, &item);
handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE);
handle->currFileIdx += 1;
}
// impl later
return 0;
}
int32_t streamSnapWriterClose(SStreamSnapWriter** ppWriter, int8_t rollback) { return 0; }
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle;
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);
taosMemoryFree(item->name);
}
streamSnapHandleDestroy(handle);
taosMemoryFree(pWriter);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册