未验证 提交 4223b43a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17322 from taosdata/FIX/TD-19239-3.0

fix: enhance subroutines to resolve WAL inconsistency after crashing or disk full
...@@ -43,6 +43,7 @@ extern "C" { ...@@ -43,6 +43,7 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
#define WAL_RECOV_SIZE_LIMIT (100 * WAL_SCAN_BUF_SIZE)
typedef enum { typedef enum {
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
......
...@@ -49,9 +49,15 @@ extern SDiskSpace tsTempSpace; ...@@ -49,9 +49,15 @@ extern SDiskSpace tsTempSpace;
void osDefaultInit(); void osDefaultInit();
void osUpdate(); void osUpdate();
void osCleanup(); void osCleanup();
bool osLogSpaceAvailable(); bool osLogSpaceAvailable();
bool osDataSpaceAvailable(); bool osDataSpaceAvailable();
bool osTempSpaceAvailable(); bool osTempSpaceAvailable();
bool osLogSpaceSufficient();
bool osDataSpaceSufficient();
bool osTempSpaceSufficient();
void osSetTimezone(const char *timezone); void osSetTimezone(const char *timezone);
void osSetSystemLocale(const char *inLocale, const char *inCharSet); void osSetSystemLocale(const char *inLocale, const char *inCharSet);
...@@ -59,4 +65,4 @@ void osSetSystemLocale(const char *inLocale, const char *inCharSet); ...@@ -59,4 +65,4 @@ void osSetSystemLocale(const char *inLocale, const char *inCharSet);
} }
#endif #endif
#endif /*_TD_OS_ENV_H_*/ #endif /*_TD_OS_ENV_H_*/
\ No newline at end of file
...@@ -450,6 +450,7 @@ int32_t* taosGetErrno(); ...@@ -450,6 +450,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003) #define TSDB_CODE_WAL_INVALID_VER TAOS_DEF_ERROR_CODE(0, 0x1003)
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004)
#define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005) #define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005)
#define TSDB_CODE_WAL_CHKSUM_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x1006)
// tfs // tfs
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) #define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201)
......
...@@ -146,7 +146,7 @@ void taos_close(TAOS *taos) { ...@@ -146,7 +146,7 @@ void taos_close(TAOS *taos) {
int taos_errno(TAOS_RES *res) { int taos_errno(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res)) {
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY;
return terrno; return terrno;
} }
...@@ -154,13 +154,12 @@ int taos_errno(TAOS_RES *res) { ...@@ -154,13 +154,12 @@ int taos_errno(TAOS_RES *res) {
return 0; return 0;
} }
return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_RPC_NETWORK_UNAVAIL return ((SRequestObj *)res)->code == TSDB_CODE_RPC_REDIRECT ? TSDB_CODE_QRY_NOT_READY : ((SRequestObj *)res)->code;
: ((SRequestObj *)res)->code;
} }
const char *taos_errstr(TAOS_RES *res) { const char *taos_errstr(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ_META(res)) { if (res == NULL || TD_RES_TMQ_META(res)) {
if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL; if (terrno == TSDB_CODE_RPC_REDIRECT) terrno = TSDB_CODE_QRY_NOT_READY;
return (const char *)tstrerror(terrno); return (const char *)tstrerror(terrno);
} }
...@@ -172,7 +171,7 @@ const char *taos_errstr(TAOS_RES *res) { ...@@ -172,7 +171,7 @@ const char *taos_errstr(TAOS_RES *res) {
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) { if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
return pRequest->msgBuf; return pRequest->msgBuf;
} else { } else {
return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_RPC_NETWORK_UNAVAIL) return pRequest->code == TSDB_CODE_RPC_REDIRECT ? (const char *)tstrerror(TSDB_CODE_QRY_NOT_READY)
: (const char *)tstrerror(pRequest->code); : (const char *)tstrerror(pRequest->code);
} }
} }
......
...@@ -51,26 +51,14 @@ static int32_t dmInitMonitor() { ...@@ -51,26 +51,14 @@ static int32_t dmInitMonitor() {
static bool dmCheckDiskSpace() { static bool dmCheckDiskSpace() {
osUpdate(); osUpdate();
if (!osDataSpaceAvailable()) { if (!osDataSpaceSufficient()) {
dError("free disk size: %f GB, too little, require %f GB at least at least , quit", dWarn("free data disk size: %f GB, not sufficient, expected %f GB at least", (double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
(double)tsDataSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
(double)tsDataSpace.reserved / 1024.0 / 1024.0 / 1024.0);
terrno = TSDB_CODE_NO_AVAIL_DISK;
return false;
} }
if (!osLogSpaceAvailable()) { if (!osLogSpaceSufficient()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", dWarn("free log disk size: %f GB, not sufficient, expected %f GB at least", (double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
(double)tsLogSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
(double)tsLogSpace.reserved / 1024.0 / 1024.0 / 1024.0);
terrno = TSDB_CODE_NO_AVAIL_DISK;
return false;
} }
if (!osTempSpaceAvailable()) { if (!osTempSpaceSufficient()) {
dError("free disk size: %f GB, too little, require %f GB at least at least, quit", dWarn("free temp disk size: %f GB, not sufficient, expected %f GB at least", (double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0, (double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
(double)tsTempSpace.size.avail / 1024.0 / 1024.0 / 1024.0,
(double)tsTempSpace.reserved / 1024.0 / 1024.0 / 1024.0);
terrno = TSDB_CODE_NO_AVAIL_DISK;
return false;
} }
return true; return true;
} }
......
...@@ -232,7 +232,7 @@ static int32_t mndInitWal(SMnode *pMnode) { ...@@ -232,7 +232,7 @@ static int32_t mndInitWal(SMnode *pMnode) {
pMnode->pWal = walOpen(path, &cfg); pMnode->pWal = walOpen(path, &cfg);
if (pMnode->pWal == NULL) { if (pMnode->pWal == NULL) {
mError("failed to open wal since %s", terrstr()); mError("failed to open wal since %s. wal:%s", terrstr(), path);
return -1; return -1;
} }
......
...@@ -1045,7 +1045,9 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { ...@@ -1045,7 +1045,9 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
ASSERT(eno == 0); ASSERT(eno == 0 &&
"tsdbCommit failure"
"Restart taosd");
code = tsdbFSCommit1(pTsdb, &pCommitter->fs); code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -160,6 +160,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -160,6 +160,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
goto _err; goto _err;
} }
ASSERT(pPool != NULL);
// do delete // do delete
SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData)); SDelData *pDelData = (SDelData *)vnodeBufPoolMalloc(pPool, sizeof(*pDelData));
if (pDelData == NULL) { if (pDelData == NULL) {
...@@ -353,6 +354,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid ...@@ -353,6 +354,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel; int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
ASSERT(pPool != NULL);
pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2); pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
if (pTbData == NULL) { if (pTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -492,6 +494,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN ...@@ -492,6 +494,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
// node // node
level = tsdbMemSkipListRandLevel(&pTbData->sl); level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
if (pNode == NULL) { if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -112,6 +112,8 @@ void vnodeBufPoolReset(SVBufPool *pPool) { ...@@ -112,6 +112,8 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode; SVBufPoolNode *pNode;
void *p = NULL; void *p = NULL;
ASSERT(pPool != NULL);
taosThreadSpinLock(&pPool->lock); taosThreadSpinLock(&pPool->lock);
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
// allocate from the anchor node // allocate from the anchor node
......
...@@ -73,7 +73,7 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -73,7 +73,7 @@ int vnodeBegin(SVnode *pVnode) {
int vnodeShouldCommit(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) {
if (pVnode->inUse) { if (pVnode->inUse) {
return pVnode->inUse->size > pVnode->inUse->node.size; return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
} }
return false; return false;
} }
...@@ -89,6 +89,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -89,6 +89,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
data = NULL; data = NULL;
if (vnodeEncodeInfo(pInfo, &data) < 0) { if (vnodeEncodeInfo(pInfo, &data) < 0) {
vError("failed to encode json info.");
return -1; return -1;
} }
...@@ -101,7 +102,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -101,7 +102,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
} }
if (taosWriteFile(pFile, data, strlen(data)) < 0) { if (taosWriteFile(pFile, data, strlen(data)) < 0) {
vError("failed to write info file:%s data:%s", fname, terrstr()); vError("failed to write info file:%s error:%s", fname, terrstr());
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -233,15 +234,15 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -233,15 +234,15 @@ int vnodeCommit(SVnode *pVnode) {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
} }
if (vnodeSaveInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0); vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit // preCommit
// smaSyncPreCommit(pVnode->pSma); // smaSyncPreCommit(pVnode->pSma);
if (smaAsyncPreCommit(pVnode->pSma) < 0) { if(smaAsyncPreCommit(pVnode->pSma) < 0){
ASSERT(0); vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -250,44 +251,44 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -250,44 +251,44 @@ int vnodeCommit(SVnode *pVnode) {
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
if (smaAsyncCommit(pVnode->pSma) < 0) { if (smaAsyncCommit(pVnode->pSma) < 0) {
ASSERT(0); vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) { if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) { if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
} else { } else {
if (tsdbCommit(pVnode->pTsdb) < 0) { if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
} }
if (tqCommit(pVnode->pTq) < 0) { if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
// walCommit (TODO) // walCommit (TODO)
// commit info // commit info
if (vnodeCommitInfo(dir, &info) < 0) { if (vnodeCommitInfo(dir, &info) < 0) {
ASSERT(0); vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
...@@ -296,7 +297,7 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -296,7 +297,7 @@ int vnodeCommit(SVnode *pVnode) {
// postCommit // postCommit
// smaSyncPostCommit(pVnode->pSma); // smaSyncPostCommit(pVnode->pSma);
if (smaAsyncPostCommit(pVnode->pSma) < 0) { if (smaAsyncPostCommit(pVnode->pSma) < 0) {
ASSERT(0); vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -140,7 +140,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -140,7 +140,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg)); pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
if (pVnode->pWal == NULL) { if (pVnode->pWal == NULL) {
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode wal since %s. wal:%s", TD_VID(pVnode), tstrerror(terrno), tdir);
goto _err; goto _err;
} }
......
...@@ -166,6 +166,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -166,6 +166,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t len; int32_t len;
int32_t ret; int32_t ret;
if (!pVnode->inUse) {
terrno = TSDB_CODE_VND_NOT_SYNCED;
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
return -1;
}
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version); version);
...@@ -286,10 +292,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -286,10 +292,16 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
_do_commit: _do_commit:
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
// commit current change // commit current change
vnodeCommit(pVnode); if (vnodeCommit(pVnode) < 0) {
vError("vgId:%d, failed to commit vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// start a new one // start a new one
vnodeBegin(pVnode); if (vnodeBegin(pVnode) < 0) {
vError("vgId:%d, failed to begin vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
} }
return 0; return 0;
......
...@@ -579,6 +579,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -579,6 +579,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1; return -1;
} }
...@@ -710,6 +711,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -710,6 +711,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1; return -1;
} }
...@@ -859,6 +861,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -859,6 +861,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1; return -1;
} }
...@@ -974,6 +977,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -974,6 +977,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -76,7 +76,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -76,7 +76,10 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
} else { } else {
pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
ASSERT(pEntry != NULL); if (pEntry == NULL) {
sError("failed to get entry since %s. index:%lld", tstrerror(terrno), index);
return;
}
} }
// cannot commit, even if quorum agree. need check term! // cannot commit, even if quorum agree. need check term!
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
...@@ -127,7 +130,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -127,7 +130,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// execute fsm // execute fsm
if (pSyncNode->pFsm != NULL) { if (pSyncNode->pFsm != NULL) {
int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
ASSERT(code == 0); if (code != 0) {
wError("failed to commit sync node since %s", tstrerror(terrno));
}
} }
} }
} }
......
...@@ -2657,7 +2657,10 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { ...@@ -2657,7 +2657,10 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
ASSERT(code == 0); if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1;
}
syncNodeReplicate(ths, false); syncNodeReplicate(ths, false);
} }
...@@ -2733,7 +2736,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI ...@@ -2733,7 +2736,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) { if (code != 0) {
// del resp mgr, call FpCommitCb // del resp mgr, call FpCommitCb
ASSERT(0); sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
return -1; return -1;
} }
...@@ -2797,8 +2800,8 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p ...@@ -2797,8 +2800,8 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
// del resp mgr, call FpCommitCb // del resp mgr, call FpCommitCb
ASSERT(0);
return -1; return -1;
} }
...@@ -3050,7 +3053,10 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -3050,7 +3053,10 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
} else { } else {
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
ASSERT(code == 0); if (code != 0) {
sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i);
return -1;
}
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
} }
......
...@@ -234,8 +234,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -234,8 +234,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, err, errStr, sysErr, sysErrStr); pEntry->index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf); syncNodeErrorLog(pData->pSyncNode, logBuf);
ASSERT(0);
return -1; return -1;
} }
pEntry->index = index; pEntry->index = index;
...@@ -414,13 +412,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -414,13 +412,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
return 0; return 0;
} }
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncRaftEntry* logStoreGetEntryWithoutLock(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
taosThreadMutexLock(&(pData->mutex));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReader* pWalHandle = pData->pWalHandle; SWalReader* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL); ASSERT(pWalHandle != NULL);
...@@ -444,7 +440,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -444,7 +440,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
} }
} while (0); } while (0);
ASSERT(0); sError("failed to read ver since %s. index:%lld", tstrerror(terrno), index);
return NULL;
} }
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
...@@ -465,7 +462,6 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -465,7 +462,6 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
terrno = saveErr; terrno = saveErr;
*/ */
taosThreadMutexUnlock(&(pData->mutex));
return pEntry; return pEntry;
} else { } else {
...@@ -473,6 +469,16 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -473,6 +469,16 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
} }
} }
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SSyncRaftEntry *pEntry = NULL;
taosThreadMutexLock(&pData->mutex);
pEntry = logStoreGetEntryWithoutLock(pLogStore, index);
taosThreadMutexUnlock(&pData->mutex);
return pEntry;
}
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
......
...@@ -121,6 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg ...@@ -121,6 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
ret = tdbPagerWrite(pPager, pPage); ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1; return -1;
} }
...@@ -483,9 +484,8 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN ...@@ -483,9 +484,8 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
ret = tdbPagerWrite(pPager, pChild); ret = tdbPagerWrite(pPager, pChild);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to write page since %s", terrstr());
ASSERT(0); return -1;
return 0;
} }
// Copy the root page content to the child page // Copy the root page content to the child page
...@@ -556,8 +556,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -556,8 +556,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
ret = tdbPagerWrite(pBt->pPager, pOlds[i]); ret = tdbPagerWrite(pBt->pPager, pOlds[i]);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to write page since %s", terrstr());
ASSERT(0);
return -1; return -1;
} }
} }
...@@ -583,8 +582,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -583,8 +582,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
ret = tdbPagerWrite(pBt->pPager, pParent); ret = tdbPagerWrite(pBt->pPager, pParent);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to write page since %s", terrstr());
ASSERT(0);
return -1; return -1;
} }
...@@ -719,8 +717,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -719,8 +717,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
ret = tdbPagerWrite(pBt->pPager, pNews[iNew]); ret = tdbPagerWrite(pBt->pPager, pNews[iNew]);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to write page since %s", terrstr());
ASSERT(0);
return -1; return -1;
} }
} }
...@@ -937,7 +934,7 @@ static int tdbFetchOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) ...@@ -937,7 +934,7 @@ static int tdbFetchOvflPage(SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt)
// mark dirty // mark dirty
ret = tdbPagerWrite(pBt->pPager, *ppOfp); ret = tdbPagerWrite(pBt->pPager, *ppOfp);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page since %s", terrstr());
return -1; return -1;
} }
...@@ -1942,7 +1939,7 @@ int tdbBtcDelete(SBTC *pBtc) { ...@@ -1942,7 +1939,7 @@ int tdbBtcDelete(SBTC *pBtc) {
// drop the cell on the leaf // drop the cell on the leaf
ret = tdbPagerWrite(pPager, pBtc->pPage); ret = tdbPagerWrite(pPager, pBtc->pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page since %s", terrstr());
return -1; return -1;
} }
...@@ -1964,7 +1961,7 @@ int tdbBtcDelete(SBTC *pBtc) { ...@@ -1964,7 +1961,7 @@ int tdbBtcDelete(SBTC *pBtc) {
if (idx < nCells) { if (idx < nCells) {
ret = tdbPagerWrite(pPager, pPage); ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page since %s", terrstr());
return -1; return -1;
} }
...@@ -2029,7 +2026,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int ...@@ -2029,7 +2026,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
// mark dirty // mark dirty
ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage); ret = tdbPagerWrite(pBtc->pBt->pPager, pBtc->pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page since %s", terrstr());
return -1; return -1;
} }
......
...@@ -106,7 +106,7 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) { ...@@ -106,7 +106,7 @@ int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager, pTxn); ret = tdbPagerBegin(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to begin pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId);
return -1; return -1;
} }
} }
...@@ -121,7 +121,7 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) { ...@@ -121,7 +121,7 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager, pTxn); ret = tdbPagerCommit(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId);
return -1; return -1;
} }
} }
...@@ -136,7 +136,7 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) { ...@@ -136,7 +136,7 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerAbort(pPager, pTxn); ret = tdbPagerAbort(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to abort pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId);
return -1; return -1;
} }
} }
......
...@@ -156,6 +156,7 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) { ...@@ -156,6 +156,7 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
ret = tdbPagerWrite(pPager, pPage); ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1; return -1;
} }
...@@ -210,7 +211,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -210,7 +211,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
ret = tdbPagerWritePageToJournal(pPager, pPage); ret = tdbPagerWritePageToJournal(pPager, pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page to journal since %s", tstrerror(terrno));
return -1; return -1;
} }
} }
...@@ -226,6 +227,8 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { ...@@ -226,6 +227,8 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
// Open the journal // Open the journal
pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (pPager->jfd < 0) { if (pPager->jfd < 0) {
tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -243,9 +246,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -243,9 +246,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
// sync the journal file // sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pPager->jfd);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno);
return 0; return -1;
} }
// loop to write the dirty pages to file // loop to write the dirty pages to file
...@@ -255,7 +258,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -255,7 +258,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage = (SPage *)pNode; pPage = (SPage *)pNode;
ret = tdbPagerWritePageToDB(pPager, pPage); ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page to db since %s", tstrerror(terrno));
return -1; return -1;
} }
} }
...@@ -277,11 +280,25 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -277,11 +280,25 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
tRBTreeCreate(&pPager->rbt, pageCmpFn); tRBTreeCreate(&pPager->rbt, pageCmpFn);
// sync the db file // sync the db file
tdbOsFSync(pPager->fd); if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
// remove the journal file // remove the journal file
tdbOsClose(pPager->jfd); if (tdbOsClose(pPager->jfd) < 0) {
tdbOsRemove(pPager->jFileName); tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPager->inTran = 0; pPager->inTran = 0;
return 0; return 0;
...@@ -297,14 +314,14 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -297,14 +314,14 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
// 0, sync the journal file // 0, sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pPager->jfd);
if (ret < 0) { if (ret < 0) {
// TODO tdbError("failed to fsync jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno);
return 0; return -1;
} }
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
if (jfd == NULL) { if (jfd == NULL) {
return 0; return -1;
} }
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
...@@ -516,11 +533,15 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { ...@@ -516,11 +533,15 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno)); ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno));
if (ret < 0) { if (ret < 0) {
tdbError("failed to write pgno due to %s. file:%s, pgno:%u", strerror(errno), pPager->jFileName, pgno);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, pPage->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -540,13 +561,15 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { ...@@ -540,13 +561,15 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0); tdbError("failed to lseek due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName, pPage->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -590,23 +613,38 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { ...@@ -590,23 +613,38 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
i64 offset = pPager->pageSize * (pgno - 1); i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
ASSERT(0); tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize); ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) { if (ret < 0) {
ASSERT(0); tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, pPager->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
} }
tdbOsFSync(pPager->fd); if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
tdbOsFree(pageBuf); tdbOsFree(pageBuf);
tdbOsClose(jfd); if (tdbOsClose(jfd) < 0) {
tdbOsRemove(pPager->jFileName); tdbError("failed to close jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0; return 0;
} }
此差异已折叠。
...@@ -81,6 +81,12 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -81,6 +81,12 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(pWal);
return NULL;
}
// set config // set config
memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
...@@ -98,15 +104,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -98,15 +104,14 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
if (taosMkDir(pWal->path) != 0) { if (taosMkDir(pWal->path) != 0) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
taosMemoryFree(pWal); goto _err;
return NULL;
} }
// init ref // init ref
pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (pWal->pRefHash == NULL) { if (pWal->pRefHash == NULL) {
taosMemoryFree(pWal); wError("failed to init hash since %s", tstrerror(terrno));
return NULL; goto _err;
} }
// open meta // open meta
...@@ -117,9 +122,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -117,9 +122,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) { if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
taosHashCleanup(pWal->pRefHash); goto _err;
taosMemoryFree(pWal);
return NULL;
} }
// init status // init status
...@@ -131,46 +134,37 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -131,46 +134,37 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeHead.head.protoVer = WAL_PROTO_VER; pWal->writeHead.head.protoVer = WAL_PROTO_VER;
pWal->writeHead.magic = WAL_MAGIC; pWal->writeHead.magic = WAL_MAGIC;
if (taosThreadMutexInit(&pWal->mutex, NULL) < 0) { // load meta
taosArrayDestroy(pWal->fileInfoSet); (void)walLoadMeta(pWal);
taosHashCleanup(pWal->pRefHash);
taosMemoryFree(pWal);
return NULL;
}
pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) {
taosHashCleanup(pWal->pRefHash);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
taosMemoryFree(pWal);
return NULL;
}
walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) { if (walCheckAndRepairMeta(pWal) < 0) {
wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId); wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash); goto _err;
taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
return NULL;
} }
if (walCheckAndRepairIdx(pWal) < 0) { if (walCheckAndRepairIdx(pWal) < 0) {
wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId); wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash); goto _err;
taosRemoveRef(tsWal.refSetId, pWal->refId); }
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet); // add ref
return NULL; pWal->refId = taosAddRef(tsWal.refSetId, pWal);
if (pWal->refId < 0) {
wError("failed to add ref for Wal since %s", tstrerror(terrno));
goto _err;
} }
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
pWal->cfg.fsyncPeriod); pWal->cfg.fsyncPeriod);
return pWal; return pWal;
_err:
taosArrayDestroy(pWal->fileInfoSet);
taosHashCleanup(pWal->pRefHash);
taosThreadMutexDestroy(&pWal->mutex);
taosMemoryFree(pWal);
pWal = NULL;
return NULL;
} }
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
...@@ -195,11 +189,11 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { ...@@ -195,11 +189,11 @@ int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
void walClose(SWal *pWal) { void walClose(SWal *pWal) {
taosThreadMutexLock(&pWal->mutex); taosThreadMutexLock(&pWal->mutex);
(void)walSaveMeta(pWal);
taosCloseFile(&pWal->pLogFile); taosCloseFile(&pWal->pLogFile);
pWal->pLogFile = NULL; pWal->pLogFile = NULL;
taosCloseFile(&pWal->pIdxFile); taosCloseFile(&pWal->pIdxFile);
pWal->pIdxFile = NULL; pWal->pIdxFile = NULL;
walSaveMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL; pWal->fileInfoSet = NULL;
taosHashCleanup(pWal->pRefHash); taosHashCleanup(pWal->pRefHash);
......
...@@ -181,7 +181,11 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { ...@@ -181,7 +181,11 @@ int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) {
SWalFileInfo tmpInfo; SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL); if (pRet == NULL) {
wError("failed to find WAL log file with ver:%lld", ver);
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
}
if (pReader->curFileFirstVer != pRet->firstVer) { if (pReader->curFileFirstVer != pRet->firstVer) {
// error code was set inner // error code was set inner
if (walReadChangeFile(pReader, pRet->firstVer) < 0) { if (walReadChangeFile(pReader, pRet->firstVer) < 0) {
...@@ -472,7 +476,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -472,7 +476,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0); wError("vgId:%d, failed to read WAL record head, index:%" PRId64 ", from log file since %s",
pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
...@@ -505,6 +510,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -505,6 +510,8 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
else { else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
wError("vgId:%d, failed to read WAL record body, index:%" PRId64 ", from log file since %s",
pReader->pWal->cfg.vgId, ver, terrstr());
taosThreadMutexUnlock(&pReader->mutex); taosThreadMutexUnlock(&pReader->mutex);
return -1; return -1;
} }
......
...@@ -79,6 +79,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { ...@@ -79,6 +79,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (pWal->pLogFile != NULL) { if (pWal->pLogFile != NULL) {
code = taosFsyncFile(pWal->pLogFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
code = taosCloseFile(&pWal->pLogFile); code = taosCloseFile(&pWal->pLogFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -86,6 +91,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) { ...@@ -86,6 +91,11 @@ int64_t walChangeWrite(SWal* pWal, int64_t ver) {
} }
} }
if (pWal->pIdxFile != NULL) { if (pWal->pIdxFile != NULL) {
code = taosFsyncFile(pWal->pIdxFile);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
code = taosCloseFile(&pWal->pIdxFile); code = taosCloseFile(&pWal->pIdxFile);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
......
...@@ -219,10 +219,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -219,10 +219,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
taosCloseFile(&pIdxFile); taosCloseFile(&pIdxFile);
taosCloseFile(&pLogFile); taosCloseFile(&pLogFile);
taosFsyncFile(pWal->pLogFile); code = walSaveMeta(pWal);
taosFsyncFile(pWal->pIdxFile); if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
walSaveMeta(pWal); taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
// unlock // unlock
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
...@@ -394,7 +396,11 @@ int32_t walRollImpl(SWal *pWal) { ...@@ -394,7 +396,11 @@ int32_t walRollImpl(SWal *pWal) {
pWal->lastRollSeq = walGetSeq(); pWal->lastRollSeq = walGetSeq();
walSaveMeta(pWal); code = walSaveMeta(pWal);
if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
goto END;
}
END: END:
return code; return code;
...@@ -550,6 +556,11 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in ...@@ -550,6 +556,11 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pIdxFile) < 0) {
wError("vgId:%d, file:%" PRId64 ".idx, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
strerror(errno));
}
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (taosFsyncFile(pWal->pLogFile) < 0) { if (taosFsyncFile(pWal->pLogFile) < 0) {
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
......
...@@ -87,11 +87,17 @@ void osUpdate() { ...@@ -87,11 +87,17 @@ void osUpdate() {
void osCleanup() {} void osCleanup() {}
bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; } bool osLogSpaceAvailable() { return tsLogSpace.size.avail > 0; }
bool osDataSpaceAvailable() { return tsDataSpace.reserved <= tsDataSpace.size.avail; } bool osDataSpaceAvailable() { return tsDataSpace.size.avail > 0; }
bool osTempSpaceAvailable() { return tsTempSpace.reserved <= tsTempSpace.size.avail; } bool osTempSpaceAvailable() { return tsTempSpace.size.avail > 0; }
bool osLogSpaceSufficient() { return tsLogSpace.size.avail > tsLogSpace.reserved; }
bool osDataSpaceSufficient() { return tsDataSpace.size.avail > tsDataSpace.reserved; }
bool osTempSpaceSufficient() { return tsTempSpace.size.avail > tsTempSpace.reserved; }
void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); } void osSetTimezone(const char *timezone) { taosSetSystemTimezone(timezone, tsTimezoneStr, &tsDaylight, &tsTimezone); }
......
...@@ -447,12 +447,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not f ...@@ -447,12 +447,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND, "TQ table schema not f
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no commited offset") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "TQ no commited offset")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Wal unexpected generic error") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "WAL unexpected generic error")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_CHKSUM_MISMATCH, "WAL checksum mismatch")
// tfs // tfs
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config") TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册