From f7908813f5af5977f08cfd36194de1f7cca61711 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 12 Nov 2021 21:35:29 +0800 Subject: [PATCH] fix sdb bugs --- include/dnode/mnode/mnode.h | 6 +-- include/dnode/mnode/sdb/sdb.h | 4 +- source/dnode/mgmt/src/dnodeMnode.c | 10 ++-- source/dnode/mnode/impl/src/mnode.c | 9 ++-- source/dnode/mnode/impl/src/mnodeAcct.c | 4 +- source/dnode/mnode/impl/src/mnodeUser.c | 8 +-- source/dnode/mnode/sdb/inc/sdbInt.h | 2 + source/dnode/mnode/sdb/src/sdbFile.c | 48 ++++++++++-------- source/dnode/mnode/sdb/src/sdbHash.c | 65 +++++++++++++++---------- 9 files changed, 93 insertions(+), 63 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 20de27e59d..e03d3ffd18 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -55,9 +55,9 @@ typedef struct { int32_t mnodeInit(SMnodePara para); void mnodeCleanup(); -int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg); -void mnodeUnDeploy(char *path); -int32_t mnodeStart(char *path, SMnodeCfg *pCfg); +int32_t mnodeDeploy(SMnodeCfg *pCfg); +void mnodeUnDeploy(); +int32_t mnodeStart(SMnodeCfg *pCfg); int32_t mnodeAlter(SMnodeCfg *pCfg); void mnodeStop(); diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index dc2f52fac3..0f4913069f 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -157,8 +157,8 @@ void sdbUnDeploy(); void *sdbAcquire(ESdbType sdb, void *pKey); void sdbRelease(void *pObj); -void *sdbFetch(ESdbType sdb, void *pIter); -void sdbCancelFetch(ESdbType sdb, void *pIter); +void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj); +void sdbCancelFetch(void *pIter); int32_t sdbGetSize(ESdbType sdb); SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen); diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index f6726bf981..5a1ac3b070 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -136,8 +136,8 @@ static int32_t dnodeWriteMnodeFile() { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.dropped); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsMnode.dropped); + len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", tsMnode.dropped); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -180,7 +180,7 @@ static int32_t dnodeStartMnode() { tsMnode.deployed = 1; taosWUnLockLatch(&tsMnode.latch); - return code; + return mnodeStart(NULL); } static void dnodeStopMnode() { @@ -212,14 +212,14 @@ static int32_t dnodeUnDeployMnode() { } dnodeStopMnode(); - mnodeUnDeploy(tsMnodeDir); + mnodeUnDeploy(); dnodeWriteMnodeFile(); return code; } static int32_t dnodeDeployMnode(SMnodeCfg *pCfg) { - int32_t code = mnodeDeploy(tsMnodeDir, pCfg); + int32_t code = mnodeDeploy(pCfg); if (code != 0) { dError("failed to deploy mnode since %s", tstrerror(code)); return code; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 21db85fcfa..763780ef46 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -134,10 +134,11 @@ static int32_t mnodeAllocInitSteps() { } static int32_t mnodeAllocStartSteps() { - struct SSteps *steps = taosStepInit(7, NULL); + struct SSteps *steps = taosStepInit(8, NULL); if (steps == NULL) return -1; taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL); + taosStepAdd(steps, "mnode-sdb-file", sdbRead, (CleanupFp)sdbCommit); taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance); taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile); taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow); @@ -170,7 +171,7 @@ int32_t mnodeInit(SMnodePara para) { void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); } -int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { +int32_t mnodeDeploy(SMnodeCfg *pCfg) { if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) { if (sdbDeploy() != 0) { mError("failed to deploy sdb since %s", terrstr()); @@ -182,9 +183,9 @@ int32_t mnodeDeploy(char *path, SMnodeCfg *pCfg) { return 0; } -void mnodeUnDeploy(char *path) { sdbUnDeploy(); } +void mnodeUnDeploy() { sdbUnDeploy(); } -int32_t mnodeStart(char *path, SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } +int32_t mnodeStart(SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeAcct.c b/source/dnode/mnode/impl/src/mnodeAcct.c index 79310e13aa..6f5c498ed2 100644 --- a/source/dnode/mnode/impl/src/mnodeAcct.c +++ b/source/dnode/mnode/impl/src/mnodeAcct.c @@ -73,7 +73,9 @@ static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; } static int32_t mnodeAcctActionDelete(SAcctObj *pAcct) { return 0; } static int32_t mnodeAcctActionUpdate(SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { - memcpy(pDstAcct, pSrcAcct, (int32_t)((char *)&pDstAcct->info - (char *)&pDstAcct)); + SAcctObj tObj; + int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj); + memcpy(pDstAcct, pSrcAcct, len); return 0; } diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c index ac215932ed..63aa171238 100644 --- a/source/dnode/mnode/impl/src/mnodeUser.c +++ b/source/dnode/mnode/impl/src/mnodeUser.c @@ -28,7 +28,7 @@ static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { int32_t dataPos = 0; SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN) SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN) - SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_KEY_LEN) + SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN) SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth) @@ -46,7 +46,7 @@ static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) { return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj)); + SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj)); SUserObj *pUser = sdbGetRowObj(pRow); if (pUser == NULL) return NULL; @@ -92,7 +92,9 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) { } static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) { - memcpy(pDstUser, pSrcUser, (int32_t)((char *)&pDstUser->prohibitDbHash - (char *)&pDstUser)); + SUserObj tObj; + int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj); + memcpy(pDstUser, pSrcUser, len); return 0; } diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 30023d51ab..aa0b3c8a58 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -71,6 +71,8 @@ typedef struct { extern SSdbMgr tsSdb; +int32_t sdbWriteImp(SSdbRaw *pRaw); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index a6a6f54879..0de1bf85a9 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -66,7 +66,7 @@ static int32_t sdbReadDataFile() { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); - FileFd fd = taosOpenFileCreateWrite(file); + FileFd fd = taosOpenFileRead(file); if (fd <= 0) { free(pRaw); terrno = TAOS_SYSTEM_ERROR(errno); @@ -76,9 +76,12 @@ static int32_t sdbReadDataFile() { int64_t offset = 0; int32_t code = 0; + int32_t readLen = 0; + int64_t ret = 0; while (1) { - int64_t ret = taosReadFile(fd, pRaw, sizeof(SSdbRaw)); + readLen = sizeof(SSdbRaw); + ret = taosReadFile(fd, pRaw, readLen); if (ret == 0) break; if (ret < 0) { @@ -87,33 +90,34 @@ static int32_t sdbReadDataFile() { break; } - if (ret < sizeof(SSdbRaw)) { + if (ret != readLen) { code = TSDB_CODE_FILE_CORRUPTED; mError("failed to read file:%s since %s", file, tstrerror(code)); break; } - ret = taosReadFile(fd, pRaw->pData, pRaw->dataLen + sizeof(int32_t)); + readLen = pRaw->dataLen + sizeof(int32_t); + ret = taosReadFile(fd, pRaw->pData, readLen); if (ret < 0) { code = TAOS_SYSTEM_ERROR(errno); mError("failed to read file:%s since %s", file, tstrerror(code)); break; } - if (ret < pRaw->dataLen + sizeof(int32_t)) { + if (ret != readLen) { code = TSDB_CODE_FILE_CORRUPTED; mError("failed to read file:%s since %s", file, tstrerror(code)); break; } - uint32_t cksum = *(int32_t *)(pRaw->pData + pRaw->dataLen); - if (!taosCheckChecksumWhole(pRaw, sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t)) != 0) { + int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t); + if (!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen) != 0) { code = TSDB_CODE_CHECKSUM_ERROR; mError("failed to read file:%s since %s", file, tstrerror(code)); break; } - code = sdbWrite(pRaw); + code = sdbWriteImp(pRaw); if (code != 0) { mError("failed to read file:%s since %s", file, terrstr()); goto PARSE_SDB_DATA_ERROR; @@ -150,39 +154,47 @@ static int32_t sdbWriteDataFile() { SRWLatch *pLock = &tsSdb.locks[i]; taosWLockLatch(pLock); - SSdbRow *pRow = taosHashIterate(hash, NULL); - while (pRow != NULL) { - if (pRow->status != SDB_STATUS_READY) continue; + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL || pRow->status != SDB_STATUS_READY) { + ppRow = taosHashIterate(hash, ppRow); + continue; + } SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); if (pRaw != NULL) { + pRaw->status = pRow->status; int32_t writeLen = sizeof(SSdbRaw) + pRaw->dataLen; if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { code = TAOS_SYSTEM_ERROR(terrno); + taosHashCancelIterate(hash, ppRow); break; } - int32_t cksum = taosCalcChecksum(0, pRaw, sizeof(SSdbRaw) + pRaw->dataLen); + + int32_t cksum = taosCalcChecksum(0, (const uint8_t *)pRaw, sizeof(SSdbRaw) + pRaw->dataLen); if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { code = TAOS_SYSTEM_ERROR(terrno); + taosHashCancelIterate(hash, ppRow); break; } } else { - taosHashCancelIterate(hash, pRow); code = TSDB_CODE_SDB_APP_ERROR; + taosHashCancelIterate(hash, ppRow); break; } - pRow = taosHashIterate(hash, pRow); + ppRow = taosHashIterate(hash, ppRow); } taosWUnLockLatch(pLock); } - taosCloseFile(fd); - if (code == 0) { code = taosFsyncFile(fd); } + taosCloseFile(fd); + if (code == 0) { char curfile[PATH_MAX] = {0}; snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); @@ -210,13 +222,11 @@ int32_t sdbRead() { } int32_t sdbCommit() { - mDebug("start to commit mnode file"); + mDebug("start to write mnode file"); return sdbWriteDataFile(); } int32_t sdbDeploy() { - mDebug("start to deploy mnode"); - if (sdbCreateDir() != 0) { return -1; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 7b1a68bba2..1092d39328 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -86,12 +86,12 @@ static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_ SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; taosRLockLatch(pLock); - SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize); - if (pDstRow == NULL) { - terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; + SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize); + if (ppDstRow == NULL || *ppDstRow == NULL) { taosRUnLockLatch(pLock); - return -1; + return sdbInsertRow(hash, pRaw, pRow, keySize); } + SSdbRow *pDstRow = *ppDstRow; pRow->status = pRaw->status; taosRUnLockLatch(pLock); @@ -110,12 +110,13 @@ static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_ SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; taosWLockLatch(pLock); - SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize); - if (pDstRow == NULL) { + SSdbRow **ppDstRow = taosHashGet(hash, pRow->pObj, keySize); + if (ppDstRow == NULL || *ppDstRow) { terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; taosWUnLockLatch(pLock); return -1; } + SSdbRow *pDstRow = *ppDstRow; pRow->status = pRaw->status; taosHashRemove(hash, pRow->pObj, keySize); @@ -123,16 +124,14 @@ static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_ SdbDeleteFp deleteFp = tsSdb.deleteFps[pRow->sdb]; if (deleteFp != NULL) { - if ((*deleteFp)(pRow->pObj) != 0) { - return -1; - } + (void)(*deleteFp)(pRow->pObj); } sdbRelease(pRow->pObj); return 0; } -int32_t sdbWrite(SSdbRaw *pRaw) { +int32_t sdbWriteImp(SSdbRaw *pRaw) { SHashObj *hash = sdbGetHash(pRaw->sdb); if (hash == NULL) return -1; @@ -171,6 +170,12 @@ int32_t sdbWrite(SSdbRaw *pRaw) { return 0; } +int32_t sdbWrite(SSdbRaw *pRaw) { + int32_t code = sdbWriteImp(pRaw); + sdbFreeRaw(pRaw); + return code; +} + void *sdbAcquire(ESdbType sdb, void *pKey) { SHashObj *hash = sdbGetHash(sdb); if (hash == NULL) return NULL; @@ -182,7 +187,7 @@ void *sdbAcquire(ESdbType sdb, void *pKey) { taosRLockLatch(pLock); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); - if (ppRow == NULL || *ppRow) { + if (ppRow == NULL || *ppRow == NULL) { terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; taosRUnLockLatch(pLock); return NULL; @@ -226,27 +231,37 @@ void sdbRelease(void *pObj) { taosRUnLockLatch(pLock); } -void *sdbFetchRow(ESdbType sdb, void *pIter) { +void *sdbFetch(ESdbType sdb, void *pIter, void **ppObj) { SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return NULL; - } + if (hash == NULL) return NULL; SRWLatch *pLock = &tsSdb.locks[sdb]; taosRLockLatch(pLock); - void *pRet = taosHashIterate(hash, pIter); + + SSdbRow **ppRow = taosHashIterate(hash, ppRow); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL || pRow->status != SDB_STATUS_READY) { + ppRow = taosHashIterate(hash, ppRow); + continue; + } + + atomic_add_fetch_32(&pRow->refCount, 1); + *ppObj = pRow->pObj; + break; + } taosRUnLockLatch(pLock); - return pRet; + return ppRow; } -void sdbCancelFetch(ESdbType sdb, void *pIter) { - SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return; - } +void sdbCancelFetch(void *pIter) { + if (pIter == NULL) return; + SSdbRow *pRow = *(SSdbRow **)pIter; + SHashObj *hash = sdbGetHash(pRow->sdb); + if (hash == NULL) return; - SRWLatch *pLock = &tsSdb.locks[sdb]; + SRWLatch *pLock = &tsSdb.locks[pRow->sdb]; taosRLockLatch(pLock); taosHashCancelIterate(hash, pIter); taosRUnLockLatch(pLock); @@ -254,9 +269,7 @@ void sdbCancelFetch(ESdbType sdb, void *pIter) { int32_t sdbGetSize(ESdbType sdb) { SHashObj *hash = sdbGetHash(sdb); - if (hash == NULL) { - return 0; - } + if (hash == NULL) return 0; SRWLatch *pLock = &tsSdb.locks[sdb]; taosRLockLatch(pLock); -- GitLab