From 63cc4e27b11d1f2992beccf8d28a0f094e5da2ae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Dec 2022 15:50:22 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/vnodeInt.h | 3 + source/dnode/vnode/src/vnd/vnodeCfg.c | 3 - source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 160 +++++++++++++++------ source/dnode/vnode/src/vnd/vnodeSvr.c | 4 - 5 files changed, 124 insertions(+), 50 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8cf212cb1d..fb15af5fac 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,11 +87,14 @@ typedef struct SCommitInfo SCommitInfo; #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" +#define VND_INFO_FNAME "vnode.json" + // vnd.h void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool); +int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); // meta typedef struct SMCtbCursor SMCtbCursor; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 5adb2eb359..b5337a1c7b 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -135,9 +135,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { tjsonAddItemToArray(pNodeInfoArr, pNodeInfo); } - // add tsdb page size config - if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1; - return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index cd56468371..5860ef72fd 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -16,11 +16,9 @@ #include "vnd.h" #include "vnodeInt.h" -#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); -static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeCommitImpl(SCommitInfo *pInfo); int vnodeBegin(SVnode *pVnode) { @@ -407,7 +405,7 @@ _err: return -1; } -static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { +int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { SJson *pJson = NULL; pJson = tjsonParse(pData); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index a34744a1da..742032a2a3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -21,6 +21,8 @@ struct SVSnapReader { int64_t sver; int64_t ever; int64_t index; + // config + int8_t cfgDone; // meta int8_t metaDone; SMetaSnapReader *pMetaReader; @@ -88,6 +90,53 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t code = 0; + // CONFIG ============== + // FIXME: if commit multiple times and the config changed? + if (!pReader->cfgDone) { + char fName[TSDB_FILENAME_LEN]; + if (pReader->pVnode->pTfs) { + snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pReader->pVnode->pTfs), TD_DIRSEP, + pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME); + } else { + snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s", pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME); + } + + TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ); + if (NULL == pFile) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + int64_t size; + if (taosFStatFile(pFile, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFile); + goto _err; + } + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFile); + goto _err; + } + ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG; + ((SSnapDataHdr *)(*ppData))->size = size + 1; + ((SSnapDataHdr *)(*ppData))->data[size] = '\0'; + + if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(*ppData); + taosCloseFile(&pFile); + goto _err; + } + + taosCloseFile(&pFile); + + pReader->cfgDone = 1; + goto _exit; + } + // META ============== if (!pReader->metaDone) { // open reader if not @@ -230,6 +279,8 @@ struct SVSnapWriter { int64_t ever; int64_t commitID; int64_t index; + // config + SVnodeInfo info; // meta SMetaSnapWriter *pMetaSnapWriter; // tsdb @@ -248,6 +299,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr int32_t code = 0; SVSnapWriter *pWriter = NULL; + // commit memory data + vnodeAsyncCommit(pVnode); + tsem_wait(&pVnode->canCommit); + // alloc pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { @@ -258,16 +313,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->sver = sver; pWriter->ever = ever; - // commit it - code = vnodeSyncCommit(pVnode); - if (code) { - taosMemoryFree(pWriter); - goto _err; - } - // inc commit ID - pVnode->state.commitID++; - pWriter->commitID = pVnode->state.commitID; + pWriter->commitID = ++pVnode->state.commitID; vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), sver, ever, pWriter->commitID); @@ -284,53 +331,82 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * int32_t code = 0; SVnode *pVnode = pWriter->pVnode; + if (!rollback) { + pVnode->config = pWriter->info.config; + pVnode->state = (SVState){.committed = pWriter->info.state.committed, + .applied = pWriter->info.state.committed, + .commitID = pWriter->commitID, + .commitTerm = pWriter->info.state.commitTerm, + .applyTerm = pWriter->info.state.commitTerm}; + pVnode->statis = pWriter->info.statis; + char dir[TSDB_FILENAME_LEN] = {0}; + if (pWriter->pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path); + } + + vnodeCommitInfo(dir, &pWriter->info); + } else { + vnodeRollback(pWriter->pVnode); + } + if (pWriter->pMetaSnapWriter) { code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback); - if (code) goto _err; + if (code) goto _exit; } if (pWriter->pTsdbSnapWriter) { code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback); - if (code) goto _err; + if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); - if (code) goto _err; + if (code) goto _exit; } - if (!rollback) { - SVnodeInfo info = {0}; - char dir[TSDB_FILENAME_LEN]; - - pVnode->state.committed = pWriter->ever; - pVnode->state.applied = pWriter->ever; - pVnode->state.applyTerm = pSnapshot->lastApplyTerm; - pVnode->state.commitTerm = pSnapshot->lastApplyTerm; - - info.config = pVnode->config; - info.state.committed = pVnode->state.applied; - info.state.commitTerm = pVnode->state.applyTerm; - info.state.commitID = pVnode->state.commitID; - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); - code = vnodeSaveInfo(dir, &info); - if (code) goto _err; - - code = vnodeCommitInfo(dir, &info); - if (code) goto _err; - - vnodeBegin(pVnode); - } else { - ASSERT(0); - } + vnodeBegin(pVnode); _exit: - vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); - taosMemoryFree(pWriter); + if (code) { + vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); + } else { + vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); + taosMemoryFree(pWriter); + } + tsem_wait(&pVnode->canCommit); return code; +} -_err: - vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); +static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { + int32_t code = 0; + + SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; + + // decode info + if (vnodeDecodeInfo(pHdr->data, &pWriter->info) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + // change some value + pWriter->info.state.commitID = pWriter->commitID; + + // modify info as needed + char dir[TSDB_FILENAME_LEN] = {0}; + if (pWriter->pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pWriter->pVnode->pTfs), TD_DIRSEP, + pWriter->pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path); + } + if (vnodeSaveInfo(dir, &pWriter->info) < 0) { + code = terrno; + goto _exit; + } + +_exit: return code; } @@ -347,6 +423,10 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { pHdr->type, nData); switch (pHdr->type) { + case SNAP_DATA_CFG: { + code = vnodeSnapWriteInfo(pWriter, pData, nData); + if (code) goto _err; + } break; case SNAP_DATA_META: { // meta if (pWriter->pMetaSnapWriter == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0fc42f3744..6092888136 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -317,11 +317,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // commit if need if (vnodeShouldCommit(pVnode)) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); -#if 0 - vnodeSyncCommit(pVnode); -#else vnodeAsyncCommit(pVnode); -#endif // start a new one if (vnodeBegin(pVnode) < 0) { -- GitLab