提交 63cc4e27 编写于 作者: H Hongze Cheng

more code

上级 0707de66
...@@ -87,11 +87,14 @@ typedef struct SCommitInfo SCommitInfo; ...@@ -87,11 +87,14 @@ typedef struct SCommitInfo SCommitInfo;
#define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
#define VND_INFO_FNAME "vnode.json"
// vnd.h // vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolFree(SVBufPool* pPool, void* p);
void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool);
int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor; typedef struct SMCtbCursor SMCtbCursor;
......
...@@ -135,9 +135,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { ...@@ -135,9 +135,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
tjsonAddItemToArray(pNodeInfoArr, pNodeInfo); tjsonAddItemToArray(pNodeInfoArr, pNodeInfo);
} }
// add tsdb page size config
if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1;
return 0; return 0;
} }
......
...@@ -16,11 +16,9 @@ ...@@ -16,11 +16,9 @@
#include "vnd.h" #include "vnd.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeCommitImpl(SCommitInfo *pInfo); static int vnodeCommitImpl(SCommitInfo *pInfo);
int vnodeBegin(SVnode *pVnode) { int vnodeBegin(SVnode *pVnode) {
...@@ -407,7 +405,7 @@ _err: ...@@ -407,7 +405,7 @@ _err:
return -1; return -1;
} }
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
SJson *pJson = NULL; SJson *pJson = NULL;
pJson = tjsonParse(pData); pJson = tjsonParse(pData);
......
...@@ -21,6 +21,8 @@ struct SVSnapReader { ...@@ -21,6 +21,8 @@ struct SVSnapReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int64_t index; int64_t index;
// config
int8_t cfgDone;
// meta // meta
int8_t metaDone; int8_t metaDone;
SMetaSnapReader *pMetaReader; SMetaSnapReader *pMetaReader;
...@@ -88,6 +90,53 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { ...@@ -88,6 +90,53 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
// CONFIG ==============
// FIXME: if commit multiple times and the config changed?
if (!pReader->cfgDone) {
char fName[TSDB_FILENAME_LEN];
if (pReader->pVnode->pTfs) {
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pReader->pVnode->pTfs), TD_DIRSEP,
pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME);
} else {
snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s", pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME);
}
TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ);
if (NULL == pFile) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
int64_t size;
if (taosFStatFile(pFile, &size, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFile);
goto _err;
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFile);
goto _err;
}
((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG;
((SSnapDataHdr *)(*ppData))->size = size + 1;
((SSnapDataHdr *)(*ppData))->data[size] = '\0';
if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(*ppData);
taosCloseFile(&pFile);
goto _err;
}
taosCloseFile(&pFile);
pReader->cfgDone = 1;
goto _exit;
}
// META ============== // META ==============
if (!pReader->metaDone) { if (!pReader->metaDone) {
// open reader if not // open reader if not
...@@ -230,6 +279,8 @@ struct SVSnapWriter { ...@@ -230,6 +279,8 @@ struct SVSnapWriter {
int64_t ever; int64_t ever;
int64_t commitID; int64_t commitID;
int64_t index; int64_t index;
// config
SVnodeInfo info;
// meta // meta
SMetaSnapWriter *pMetaSnapWriter; SMetaSnapWriter *pMetaSnapWriter;
// tsdb // tsdb
...@@ -248,6 +299,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -248,6 +299,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t code = 0; int32_t code = 0;
SVSnapWriter *pWriter = NULL; SVSnapWriter *pWriter = NULL;
// commit memory data
vnodeAsyncCommit(pVnode);
tsem_wait(&pVnode->canCommit);
// alloc // alloc
pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) { if (pWriter == NULL) {
...@@ -258,16 +313,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -258,16 +313,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
// commit it
code = vnodeSyncCommit(pVnode);
if (code) {
taosMemoryFree(pWriter);
goto _err;
}
// inc commit ID // inc commit ID
pVnode->state.commitID++; pWriter->commitID = ++pVnode->state.commitID;
pWriter->commitID = pVnode->state.commitID;
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
sver, ever, pWriter->commitID); sver, ever, pWriter->commitID);
...@@ -284,53 +331,82 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * ...@@ -284,53 +331,82 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
if (!rollback) {
pVnode->config = pWriter->info.config;
pVnode->state = (SVState){.committed = pWriter->info.state.committed,
.applied = pWriter->info.state.committed,
.commitID = pWriter->commitID,
.commitTerm = pWriter->info.state.commitTerm,
.applyTerm = pWriter->info.state.commitTerm};
pVnode->statis = pWriter->info.statis;
char dir[TSDB_FILENAME_LEN] = {0};
if (pWriter->pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
}
vnodeCommitInfo(dir, &pWriter->info);
} else {
vnodeRollback(pWriter->pVnode);
}
if (pWriter->pMetaSnapWriter) { if (pWriter->pMetaSnapWriter) {
code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback); code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback);
if (code) goto _err; if (code) goto _exit;
} }
if (pWriter->pTsdbSnapWriter) { if (pWriter->pTsdbSnapWriter) {
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback); code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
if (code) goto _err; if (code) goto _exit;
} }
if (pWriter->pRsmaSnapWriter) { if (pWriter->pRsmaSnapWriter) {
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
if (code) goto _err; if (code) goto _exit;
} }
if (!rollback) { vnodeBegin(pVnode);
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
pVnode->state.committed = pWriter->ever;
pVnode->state.applied = pWriter->ever;
pVnode->state.applyTerm = pSnapshot->lastApplyTerm;
pVnode->state.commitTerm = pSnapshot->lastApplyTerm;
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
code = vnodeSaveInfo(dir, &info);
if (code) goto _err;
code = vnodeCommitInfo(dir, &info);
if (code) goto _err;
vnodeBegin(pVnode);
} else {
ASSERT(0);
}
_exit: _exit:
vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); if (code) {
taosMemoryFree(pWriter); vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code));
} else {
vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback);
taosMemoryFree(pWriter);
}
tsem_wait(&pVnode->canCommit);
return code; return code;
}
_err: static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); int32_t code = 0;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
// decode info
if (vnodeDecodeInfo(pHdr->data, &pWriter->info) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
// change some value
pWriter->info.state.commitID = pWriter->commitID;
// modify info as needed
char dir[TSDB_FILENAME_LEN] = {0};
if (pWriter->pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pWriter->pVnode->pTfs), TD_DIRSEP,
pWriter->pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
}
if (vnodeSaveInfo(dir, &pWriter->info) < 0) {
code = terrno;
goto _exit;
}
_exit:
return code; return code;
} }
...@@ -347,6 +423,10 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { ...@@ -347,6 +423,10 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
pHdr->type, nData); pHdr->type, nData);
switch (pHdr->type) { switch (pHdr->type) {
case SNAP_DATA_CFG: {
code = vnodeSnapWriteInfo(pWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_META: { case SNAP_DATA_META: {
// meta // meta
if (pWriter->pMetaSnapWriter == NULL) { if (pWriter->pMetaSnapWriter == NULL) {
......
...@@ -317,11 +317,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -317,11 +317,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// commit if need // commit if need
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
#if 0
vnodeSyncCommit(pVnode);
#else
vnodeAsyncCommit(pVnode); vnodeAsyncCommit(pVnode);
#endif
// start a new one // start a new one
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册