/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "vnd.h" #include "vnodeInt.h" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeCommitImpl(SCommitInfo *pInfo); #define WAIT_TIME_MILI_SEC 10 // miliseconds int vnodeBegin(SVnode *pVnode) { // alloc buffer pool taosThreadMutexLock(&pVnode->mutex); int32_t nTry = 0; for (;;) { while (pVnode->freeList == NULL) { vDebug("vgId:%d no free buffer pool, try to wait %d...", TD_VID(pVnode), ++nTry); struct timeval tv; struct timespec ts; taosGetTimeOfDay(&tv); ts.tv_sec = tv.tv_sec; ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000; int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts); if (rc == ETIMEDOUT) { // time out, break break; } else if (rc != 0) { // error occurred terrno = TAOS_SYSTEM_ERROR(rc); vError("vgId:%d %s failed since %s", TD_VID(pVnode), __func__, tstrerror(terrno)); taosThreadMutexUnlock(&pVnode->mutex); return -1; } } if (pVnode->freeList) { // allocate from free list pVnode->inUse = pVnode->freeList; pVnode->inUse->nRef = 1; pVnode->freeList = pVnode->inUse->freeNext; pVnode->inUse->freeNext = NULL; break; } else if (nTry == 1) { // try to recycle a buffer pool vInfo("vgId:%d no free buffer pool, try to recycle...", TD_VID(pVnode)); ASSERT(false); // TODO: condition of nTry == 1 may not be reasonable } } taosThreadMutexUnlock(&pVnode->mutex); pVnode->state.commitID++; // begin meta if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } // begin tsdb if (tsdbBegin(pVnode->pTsdb) < 0) { vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } // begin sma if (VND_IS_RSMA(pVnode) && smaBegin(pVnode->pSma) < 0) { vError("vgId:%d, failed to begin sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } return 0; } int vnodeShouldCommit(SVnode *pVnode) { if (pVnode->inUse) { return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); } return false; } int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { char fname[TSDB_FILENAME_LEN]; TdFilePtr pFile; char *data; snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); // encode info data = NULL; if (vnodeEncodeInfo(pInfo, &data) < 0) { vError("failed to encode json info."); return -1; } // save info to a vnode_tmp.json pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { vError("failed to open info file:%s for write:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (taosWriteFile(pFile, data, strlen(data)) < 0) { vError("failed to write info file:%s error:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } if (taosFsyncFile(pFile) < 0) { vError("failed to fsync info file:%s error:%s", fname, terrstr()); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } taosCloseFile(&pFile); // free info binary taosMemoryFree(data); vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex); return 0; _err: taosCloseFile(&pFile); taosMemoryFree(data); return -1; } int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) { char fname[TSDB_FILENAME_LEN]; char tfname[TSDB_FILENAME_LEN]; snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME); snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); if (taosRenameFile(tfname, fname) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId); return 0; } int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { char fname[TSDB_FILENAME_LEN]; TdFilePtr pFile = NULL; char *pData = NULL; int64_t size; snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME); // read info pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if (taosFStatFile(pFile, &size, NULL) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } pData = taosMemoryMalloc(size + 1); if (pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } if (taosReadFile(pFile, pData, size) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } pData[size] = '\0'; taosCloseFile(&pFile); // decode info if (vnodeDecodeInfo(pData, pInfo) < 0) { taosMemoryFree(pData); return -1; } taosMemoryFree(pData); return 0; _err: taosCloseFile(&pFile); taosMemoryFree(pData); return -1; } static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { int32_t code = 0; int32_t lino = 0; char dir[TSDB_FILENAME_LEN] = {0}; tsem_wait(&pVnode->canCommit); pVnode->state.commitTerm = pVnode->state.applyTerm; pInfo->info.config = pVnode->config; pInfo->info.state.committed = pVnode->state.applied; pInfo->info.state.commitTerm = pVnode->state.applyTerm; pInfo->info.state.commitID = pVnode->state.commitID; pInfo->pVnode = pVnode; pInfo->txn = metaGetTxn(pVnode->pMeta); // save info if (pVnode->pTfs) { snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); } else { snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode)); if (vnodeSaveInfo(dir, &pInfo->info) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } tsdbPrepareCommit(pVnode->pTsdb); metaPrepareAsyncCommit(pVnode->pMeta); code = smaPrepareAsyncCommit(pVnode->pSma); if (code) goto _exit; vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; _exit: if (code) { vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code), pVnode->state.commitID); } else { vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); } return code; } static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; SCommitInfo *pInfo = (SCommitInfo *)arg; // commit code = vnodeCommitImpl(pInfo); if (code) goto _exit; _exit: // end commit tsem_post(&pInfo->pVnode->canCommit); taosMemoryFree(pInfo); return code; } int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); if (NULL == pInfo) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } // prepare to commit code = vnodePrepareCommit(pVnode, pInfo); if (TSDB_CODE_SUCCESS != code) { goto _exit; } // schedule the task code = vnodeScheduleTask(vnodeCommitTask, pInfo); _exit: if (code) { if (NULL != pInfo) { taosMemoryFree(pInfo); } tsem_post(&pVnode->canCommit); vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), pVnode->state.commitID); } else { vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied); } return code; } int vnodeSyncCommit(SVnode *pVnode) { vnodeAsyncCommit(pVnode); tsem_wait(&pVnode->canCommit); tsem_post(&pVnode->canCommit); return 0; } static int vnodeCommitImpl(SCommitInfo *pInfo) { int32_t code = 0; int32_t lino = 0; char dir[TSDB_FILENAME_LEN] = {0}; SVnode *pVnode = pInfo->pVnode; vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); // persist wal before starting if (walPersist(pVnode->pWal) < 0) { vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); return -1; } if (pVnode->pTfs) { snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); } else { snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } // walBeginSnapshot(pVnode->pWal, pVnode->state.applied); syncBeginSnapshot(pVnode->sync, pVnode->state.applied); // commit each sub-system code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo); TSDB_CHECK_CODE(code, lino, _exit); } if (tqCommit(pVnode->pTq) < 0) { code = TSDB_CODE_FAILED; TSDB_CHECK_CODE(code, lino, _exit); } // commit info if (vnodeCommitInfo(dir, &pInfo->info) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbFinishCommit(pVnode->pTsdb); TSDB_CHECK_CODE(code, lino, _exit); if (VND_IS_RSMA(pVnode)) { code = smaFinishCommit(pVnode->pSma); TSDB_CHECK_CODE(code, lino, _exit); } if (metaFinishCommit(pVnode->pMeta, pInfo->txn) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } pVnode->state.committed = pInfo->info.state.committed; if (smaPostCommit(pVnode->pSma) < 0) { vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } // walEndSnapshot(pVnode->pWal); syncEndSnapshot(pVnode->sync); _exit: if (code) { vError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } else { vInfo("vgId:%d, commit end", TD_VID(pVnode)); } return 0; } bool vnodeShouldRollback(SVnode *pVnode) { char tFName[TSDB_FILENAME_LEN] = {0}; snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, VND_INFO_FNAME_TMP); return taosCheckExistFile(tFName); } void vnodeRollback(SVnode *pVnode) { char tFName[TSDB_FILENAME_LEN] = {0}; snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, VND_INFO_FNAME_TMP); (void)taosRemoveFile(tFName); } static int vnodeEncodeState(const void *pObj, SJson *pJson) { const SVState *pState = (SVState *)pObj; if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1; return 0; } static int vnodeDecodeState(const SJson *pJson, void *pObj) { SVState *pState = (SVState *)pObj; int32_t code; tjsonGetNumberValue(pJson, "commit version", pState->committed, code); if (code < 0) return -1; tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code); if (code < 0) return -1; tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code); if (code < 0) return -1; return 0; } static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { SJson *pJson; char *pData; *ppData = NULL; pJson = tjsonCreateObject(); if (pJson == NULL) { return -1; } if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) { goto _err; } if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) { goto _err; } pData = tjsonToString(pJson); if (pData == NULL) { goto _err; } tjsonDelete(pJson); *ppData = pData; return 0; _err: tjsonDelete(pJson); return -1; } int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { SJson *pJson = NULL; pJson = tjsonParse(pData); if (pJson == NULL) { return -1; } if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) { goto _err; } if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) { goto _err; } tjsonDelete(pJson); return 0; _err: tjsonDelete(pJson); return -1; }