提交 7a37edce 编写于 作者: S Shengliang Guan

fix sdb errors

上级 f26cf444
...@@ -95,11 +95,21 @@ extern "C" { ...@@ -95,11 +95,21 @@ extern "C" {
#define SDB_SET_DATALEN(pRaw, dataLen) \ #define SDB_SET_DATALEN(pRaw, dataLen) \
{ \ { \
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \ if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ }; \
} }
typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum {
SDB_STATUS_CREATING = 1,
SDB_STATUS_READY = 2,
SDB_STATUS_DROPPING = 3,
SDB_STATUS_DROPPED = 4
} ESdbStatus;
typedef enum { typedef enum {
SDB_START = 0, SDB_START = 0,
SDB_TRANS = 1, SDB_TRANS = 1,
...@@ -116,12 +126,6 @@ typedef enum { ...@@ -116,12 +126,6 @@ typedef enum {
SDB_MAX = 12 SDB_MAX = 12
} ESdbType; } ESdbType;
typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, SDB_STATUS_DROPPING = 3 } ESdbStatus;
typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow;
typedef int32_t (*SdbInsertFp)(void *pObj); typedef int32_t (*SdbInsertFp)(void *pObj);
typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(void *pObj); typedef int32_t (*SdbDeleteFp)(void *pObj);
...@@ -165,7 +169,6 @@ int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); ...@@ -165,7 +169,6 @@ int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val);
int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen); int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen);
int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen);
int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status);
int32_t sdbSetRawAction(SSdbRaw *pRaw, ESdbAction action);
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val);
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val);
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val);
......
...@@ -136,13 +136,15 @@ int32_t* taosGetErrno(); ...@@ -136,13 +136,15 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) #define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321) #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321)
#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322) #define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322)
#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0323) #define TSDB_CODE_SDB_OBJ_CREATING TAOS_DEF_ERROR_CODE(0, 0x0323)
#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0324) #define TSDB_CODE_SDB_OBJ_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0324)
#define TSDB_CODE_SDB_INVALID_ACTION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325) #define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325)
#define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326) #define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326)
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0327) #define TSDB_CODE_SDB_INVALID_ACTION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0327)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0328) #define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0328)
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x0329) #define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0329)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x032A)
#define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x032B)
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists") #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists")
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist") #define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist")
......
...@@ -94,6 +94,7 @@ static int32_t mnodeCreateDefaultAcct() { ...@@ -94,6 +94,7 @@ static int32_t mnodeCreateDefaultAcct() {
SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj); SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj);
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pRaw); return sdbWrite(pRaw);
} }
......
...@@ -110,6 +110,7 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { ...@@ -110,6 +110,7 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
SSdbRaw *pRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pRaw = mnodeUserActionEncode(&userObj);
if (pRaw == NULL) return -1; if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
return sdbWrite(pRaw); return sdbWrite(pRaw);
} }
...@@ -149,15 +150,13 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM ...@@ -149,15 +150,13 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
return -1; return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
sdbSetRawAction(pRedoRaw, SDB_ACTION_INSERT);
SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj);
if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) {
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPING); sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
sdbSetRawAction(pUndoRaw, SDB_ACTION_DELETE);
SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj);
if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) {
...@@ -165,7 +164,6 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM ...@@ -165,7 +164,6 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
return -1; return -1;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
sdbSetRawAction(pCommitRaw, SDB_ACTION_UPDATE);
if (trnPrepare(pTrans, mnodeSyncPropose) != 0) { if (trnPrepare(pTrans, mnodeSyncPropose) != 0) {
trnDrop(pTrans); trnDrop(pTrans);
......
...@@ -40,14 +40,14 @@ typedef struct SSdbRaw { ...@@ -40,14 +40,14 @@ typedef struct SSdbRaw {
int8_t sdb; int8_t sdb;
int8_t sver; int8_t sver;
int8_t status; int8_t status;
int8_t action; int8_t reserved;
int8_t reserved[4];
int32_t cksum; int32_t cksum;
int32_t dataLen; int32_t dataLen;
char pData[]; char pData[];
} SSdbRaw; } SSdbRaw;
typedef struct SSdbRow { typedef struct SSdbRow {
ESdbType sdb;
ESdbStatus status; ESdbStatus status;
int32_t refCount; int32_t refCount;
char pObj[]; char pObj[];
......
...@@ -45,7 +45,7 @@ int32_t sdbInit() { ...@@ -45,7 +45,7 @@ int32_t sdbInit() {
type = TSDB_DATA_TYPE_BINARY; type = TSDB_DATA_TYPE_BINARY;
} }
SHashObj *hash = taosHashInit(128, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
if (hash == NULL) { if (hash == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -17,8 +17,9 @@ ...@@ -17,8 +17,9 @@
#include "sdbInt.h" #include "sdbInt.h"
#include "tglobal.h" #include "tglobal.h"
static int32_t sdbCreateDir() { static int32_t sdbCreateDir() {
mDebug("start to create mnode at %s", tsMnodeDir);
if (!taosMkDir(tsSdb.currDir)) { if (!taosMkDir(tsSdb.currDir)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr()); mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr());
...@@ -41,6 +42,8 @@ static int32_t sdbCreateDir() { ...@@ -41,6 +42,8 @@ static int32_t sdbCreateDir() {
} }
static int32_t sdbRunDeployFp() { static int32_t sdbRunDeployFp() {
mDebug("start to run deploy functions");
for (int32_t i = SDB_START; i < SDB_MAX; ++i) { for (int32_t i = SDB_START; i < SDB_MAX; ++i) {
SdbDeployFp fp = tsSdb.deployFps[i]; SdbDeployFp fp = tsSdb.deployFps[i];
if (fp == NULL) continue; if (fp == NULL) continue;
...@@ -58,25 +61,26 @@ static int32_t sdbWriteVersion(FileFd fd) { return 0; } ...@@ -58,25 +61,26 @@ static int32_t sdbWriteVersion(FileFd fd) { return 0; }
static int32_t sdbReadVersion(FileFd fd) { return 0; } static int32_t sdbReadVersion(FileFd fd) { return 0; }
static int32_t sdbReadDataFile() { static int32_t sdbReadDataFile() {
int32_t code = 0;
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
if (pRaw == NULL) { if (pRaw == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir); snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
FileFd fd = taosOpenFileCreateWrite(file); FileFd fd = taosOpenFileCreateWrite(file);
if (fd <= 0) { if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for read since %s", file, tstrerror(code)); mError("failed to open file:%s for read since %s", file, terrstr());
return code; return -1;
} }
int64_t offset = 0; int64_t offset = 0;
int32_t code = 0;
while (1) { while (1) {
int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw)); int64_t ret = taosReadFile(fd, pRaw, sizeof(SSdbRaw));
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
...@@ -93,7 +97,7 @@ static int32_t sdbReadDataFile() { ...@@ -93,7 +97,7 @@ static int32_t sdbReadDataFile() {
code = sdbWrite(pRaw); code = sdbWrite(pRaw);
if (code != 0) { if (code != 0) {
mError("failed to read file:%s since %s", file, tstrerror(code)); mError("failed to read file:%s since %s", file, terrstr());
goto PARSE_SDB_DATA_ERROR; goto PARSE_SDB_DATA_ERROR;
} }
} }
...@@ -102,32 +106,35 @@ static int32_t sdbReadDataFile() { ...@@ -102,32 +106,35 @@ static int32_t sdbReadDataFile() {
PARSE_SDB_DATA_ERROR: PARSE_SDB_DATA_ERROR:
taosCloseFile(fd); taosCloseFile(fd);
terrno = code;
return code; return code;
} }
static int32_t sdbWriteDataFile() { static int32_t sdbWriteDataFile() {
int32_t code = 0;
char tmpfile[PATH_MAX] = {0}; char tmpfile[PATH_MAX] = {0};
snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir); snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir);
FileFd fd = taosOpenFileCreateWrite(tmpfile); FileFd fd = taosOpenFileCreateWrite(tmpfile);
if (fd <= 0) { if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code)); mError("failed to open file:%s for write since %s", tmpfile, terrstr());
return code; return -1;
} }
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { int32_t code = 0;
SHashObj *hash = tsSdb.hashObjs[i];
if (!hash) continue;
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
SdbEncodeFp encodeFp = tsSdb.encodeFps[i]; SdbEncodeFp encodeFp = tsSdb.encodeFps[i];
if (!encodeFp) continue; if (encodeFp == NULL) continue;
SHashObj *hash = tsSdb.hashObjs[i];
SRWLatch *pLock = &tsSdb.locks[i];
taosWLockLatch(pLock);
SSdbRow *pRow = taosHashIterate(hash, NULL); SSdbRow *pRow = taosHashIterate(hash, NULL);
while (pRow != NULL) { while (pRow != NULL) {
if (pRow->status == SDB_STATUS_READY) continue; if (pRow->status != SDB_STATUS_READY) continue;
SSdbRaw *pRaw = (*encodeFp)(pRow->pObj); SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
if (pRaw != NULL) { if (pRaw != NULL) {
taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen); taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
...@@ -139,6 +146,7 @@ static int32_t sdbWriteDataFile() { ...@@ -139,6 +146,7 @@ static int32_t sdbWriteDataFile() {
pRow = taosHashIterate(hash, pRow); pRow = taosHashIterate(hash, pRow);
} }
taosWUnLockLatch(pLock);
} }
if (code == 0) { if (code == 0) {
...@@ -151,41 +159,40 @@ static int32_t sdbWriteDataFile() { ...@@ -151,41 +159,40 @@ static int32_t sdbWriteDataFile() {
code = taosFsyncFile(fd); code = taosFsyncFile(fd);
} }
if (code != 0) { if (code == 0) {
char curfile[PATH_MAX] = {0}; char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir); snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
code = taosRenameFile(tmpfile, curfile); code = taosRenameFile(tmpfile, curfile);
} }
if (code != 0) { if (code != 0) {
mError("failed to write sdb file since %s", tstrerror(code)); terrno = code;
mError("failed to write sdb file since %s", terrstr());
} else { } else {
mInfo("write sdb file successfully"); mDebug("write sdb file successfully");
} }
return code; return code;
} }
int32_t sdbRead() { int32_t sdbRead() {
int32_t code = sdbReadDataFile(); mDebug("start to read mnode file");
if (code != 0) {
return code; if (sdbReadDataFile() != 0) {
return -1;
} }
mInfo("read sdb file successfully"); return 0;
return -1;
} }
int32_t sdbCommit() { int32_t sdbCommit() {
int32_t code = sdbWriteDataFile(); mDebug("start to commit mnode file");
if (code != 0) { return sdbWriteDataFile();
return code;
}
return 0;
} }
int32_t sdbDeploy() { int32_t sdbDeploy() {
mDebug("start to deploy mnode");
if (sdbCreateDir() != 0) { if (sdbCreateDir() != 0) {
return -1; return -1;
} }
...@@ -201,4 +208,7 @@ int32_t sdbDeploy() { ...@@ -201,4 +208,7 @@ int32_t sdbDeploy() {
return 0; return 0;
} }
void sdbUnDeploy() {} void sdbUnDeploy() {
mDebug("start to undeploy mnode");
taosRemoveDir(tsMnodeDir);
}
...@@ -32,25 +32,49 @@ static SHashObj *sdbGetHash(int32_t sdb) { ...@@ -32,25 +32,49 @@ static SHashObj *sdbGetHash(int32_t sdb) {
return hash; return hash;
} }
static int32_t sdbGetkeySize(ESdbType sdb, void *pKey) {
int32_t keySize;
EKeyType keyType = tsSdb.keyTypes[sdb];
if (keyType == SDB_KEY_INT32) {
keySize = sizeof(int32_t);
} else if (keyType == SDB_KEY_BINARY) {
keySize = strlen(pKey) + 1;
} else {
keySize = sizeof(int64_t);
}
return keySize;
}
static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SSdbRow *pDstRow = NULL; SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
taosWLockLatch(pLock);
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
if (pDstRow != NULL) { if (pDstRow != NULL) {
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
taosWUnLockLatch(pLock);
return -1; return -1;
} }
pRow->refCount = 0; pRow->refCount = 1;
pRow->status = pRaw->status; pRow->status = pRaw->status;
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosWUnLockLatch(pLock);
return -1; return -1;
} }
SdbInsertFp insertFp = tsSdb.insertFps[pRaw->sdb]; taosWUnLockLatch(pLock);
SdbInsertFp insertFp = tsSdb.insertFps[pRow->sdb];
if (insertFp != NULL) { if (insertFp != NULL) {
if ((*insertFp)(pRow->pObj) != 0) { if ((*insertFp)(pRow->pObj) != 0) {
taosWLockLatch(pLock);
taosHashRemove(hash, pRow->pObj, keySize); taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock);
return -1; return -1;
} }
} }
...@@ -59,13 +83,20 @@ static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_ ...@@ -59,13 +83,20 @@ static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_
} }
static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SSdbRow *pDstRow = NULL; SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
taosRLockLatch(pLock);
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
if (pDstRow == NULL) { if (pDstRow == NULL) {
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
taosRUnLockLatch(pLock);
return -1; return -1;
} }
SdbUpdateFp updateFp = tsSdb.updateFps[pRaw->sdb]; pRow->status = pRaw->status;
taosRUnLockLatch(pLock);
SdbUpdateFp updateFp = tsSdb.updateFps[pRow->sdb];
if (updateFp != NULL) { if (updateFp != NULL) {
if ((*updateFp)(pRow->pObj, pDstRow->pObj) != 0) { if ((*updateFp)(pRow->pObj, pDstRow->pObj) != 0) {
return -1; return -1;
...@@ -76,20 +107,28 @@ static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_ ...@@ -76,20 +107,28 @@ static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_
} }
static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SSdbRow *pDstRow = NULL; SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
taosWLockLatch(pLock);
SSdbRow *pDstRow = taosHashGet(hash, pRow->pObj, keySize);
if (pDstRow == NULL) { if (pDstRow == NULL) {
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
taosWUnLockLatch(pLock);
return -1; return -1;
} }
SdbDeleteFp deleteFp = tsSdb.deleteFps[pRaw->sdb]; pRow->status = pRaw->status;
taosHashRemove(hash, pRow->pObj, keySize);
taosWUnLockLatch(pLock);
SdbDeleteFp deleteFp = tsSdb.deleteFps[pRow->sdb];
if (deleteFp != NULL) { if (deleteFp != NULL) {
if ((*deleteFp)(pRow->pObj) != 0) { if ((*deleteFp)(pRow->pObj) != 0) {
return -1; return -1;
} }
} }
taosHashRemove(hash, pRow->pObj, keySize); sdbRelease(pRow->pObj);
return 0; return 0;
} }
...@@ -104,73 +143,87 @@ int32_t sdbWrite(SSdbRaw *pRaw) { ...@@ -104,73 +143,87 @@ int32_t sdbWrite(SSdbRaw *pRaw) {
return -1; return -1;
} }
int32_t keySize; pRow->sdb = pRaw->sdb;
EKeyType keyType = tsSdb.keyTypes[pRaw->sdb];
if (keyType == SDB_KEY_INT32) {
keySize = sizeof(int32_t);
} else if (keyType == SDB_KEY_BINARY) {
keySize = strlen(pRow->pObj) + 1;
} else {
keySize = sizeof(int64_t);
}
int32_t keySize = sdbGetkeySize(pRow->sdb, pRow->pObj);
int32_t code = -1; int32_t code = -1;
if (pRaw->action == SDB_ACTION_INSERT) {
code = sdbInsertRow(hash, pRaw, pRow, keySize); switch (pRaw->status) {
} else if (pRaw->action == SDB_ACTION_UPDATE) { case SDB_STATUS_CREATING:
code = sdbUpdateRow(hash, pRaw, pRow, keySize); code = sdbInsertRow(hash, pRaw, pRow, keySize);
} else if (pRaw->action == SDB_ACTION_DELETE) { break;
code = sdbDeleteRow(hash, pRaw, pRow, keySize); case SDB_STATUS_READY:
} else { case SDB_STATUS_DROPPING:
terrno = TSDB_CODE_SDB_INVALID_ACTION_TYPE; code = sdbUpdateRow(hash, pRaw, pRow, keySize);
break;
case SDB_STATUS_DROPPED:
code = sdbDeleteRow(hash, pRaw, pRow, keySize);
break;
default:
terrno = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
break;
} }
if (code != 0) { if (code != 0) {
sdbFreeRow(pRow); sdbFreeRow(pRow);
} }
return 0; return 0;
} }
void *sdbAcquire(ESdbType sdb, void *pKey) { void *sdbAcquire(ESdbType sdb, void *pKey) {
terrno = 0;
SHashObj *hash = sdbGetHash(sdb); SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) { if (hash == NULL) return NULL;
void *pRet = NULL;
int32_t keySize = sdbGetkeySize(sdb, pKey);
SRWLatch *pLock = &tsSdb.locks[sdb];
taosRLockLatch(pLock);
SSdbRow **ppRow = taosHashGet(hash, pKey, keySize);
if (ppRow == NULL || *ppRow) {
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
taosRUnLockLatch(pLock);
return NULL; return NULL;
} }
int32_t keySize; SSdbRow *pRow = *ppRow;
EKeyType keyType = tsSdb.keyTypes[sdb]; switch (pRow->status) {
case SDB_STATUS_READY:
switch (keyType) { atomic_add_fetch_32(&pRow->refCount, 1);
case SDB_KEY_INT32: pRet = pRow->pObj;
keySize = sizeof(int32_t);
break; break;
case SDB_KEY_INT64: case SDB_STATUS_CREATING:
keySize = sizeof(int64_t); terrno = TSDB_CODE_SDB_OBJ_CREATING;
break; break;
case SDB_KEY_BINARY: case SDB_STATUS_DROPPING:
keySize = strlen(pKey) + 1; terrno = TSDB_CODE_SDB_OBJ_DROPPING;
break; break;
default: default:
keySize = sizeof(int32_t); terrno = TSDB_CODE_SDB_APP_ERROR;
break;
} }
SSdbRow *pRow = taosHashGet(hash, pKey, keySize); taosRUnLockLatch(pLock);
if (pRow == NULL) return NULL; return pRet;
if (pRow->status == SDB_STATUS_READY) {
atomic_add_fetch_32(&pRow->refCount, 1);
return pRow->pObj;
} else {
terrno = -1; // todo
return NULL;
}
} }
void sdbRelease(void *pObj) { void sdbRelease(void *pObj) {
if (pObj == NULL) return;
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
atomic_sub_fetch_32(&pRow->refCount, 1); if (pRow->sdb >= SDB_MAX || pRow->sdb <= SDB_START) return;
SRWLatch *pLock = &tsSdb.locks[pRow->sdb];
taosRLockLatch(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pRow);
}
taosRUnLockLatch(pLock);
} }
void *sdbFetchRow(ESdbType sdb, void *pIter) { void *sdbFetchRow(ESdbType sdb, void *pIter) {
...@@ -179,7 +232,12 @@ void *sdbFetchRow(ESdbType sdb, void *pIter) { ...@@ -179,7 +232,12 @@ void *sdbFetchRow(ESdbType sdb, void *pIter) {
return NULL; return NULL;
} }
return taosHashIterate(hash, pIter); SRWLatch *pLock = &tsSdb.locks[sdb];
taosRLockLatch(pLock);
void *pRet = taosHashIterate(hash, pIter);
taosRUnLockLatch(pLock);
return pRet;
} }
void sdbCancelFetch(ESdbType sdb, void *pIter) { void sdbCancelFetch(ESdbType sdb, void *pIter) {
...@@ -187,7 +245,11 @@ void sdbCancelFetch(ESdbType sdb, void *pIter) { ...@@ -187,7 +245,11 @@ void sdbCancelFetch(ESdbType sdb, void *pIter) {
if (hash == NULL) { if (hash == NULL) {
return; return;
} }
SRWLatch *pLock = &tsSdb.locks[sdb];
taosRLockLatch(pLock);
taosHashCancelIterate(hash, pIter); taosHashCancelIterate(hash, pIter);
taosRUnLockLatch(pLock);
} }
int32_t sdbGetSize(ESdbType sdb) { int32_t sdbGetSize(ESdbType sdb) {
...@@ -195,5 +257,11 @@ int32_t sdbGetSize(ESdbType sdb) { ...@@ -195,5 +257,11 @@ int32_t sdbGetSize(ESdbType sdb) {
if (hash == NULL) { if (hash == NULL) {
return 0; return 0;
} }
return taosHashGetSize(hash);
SRWLatch *pLock = &tsSdb.locks[sdb];
taosRLockLatch(pLock);
int32_t size = taosHashGetSize(hash);
taosRUnLockLatch(pLock);
return size;
} }
...@@ -116,16 +116,6 @@ int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) { ...@@ -116,16 +116,6 @@ int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) {
return 0; return 0;
} }
int32_t sdbSetRawAction(SSdbRaw *pRaw, ESdbAction action) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
pRaw->action = action;
return 0;
}
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) { int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) {
if (pRaw == NULL) { if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
......
...@@ -147,6 +147,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_INIT_STEP, "failed to init compon ...@@ -147,6 +147,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FAILED_TO_INIT_STEP, "failed to init compon
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_APP_ERROR, "Unexpected generic error in sdb") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_APP_ERROR, "Unexpected generic error in sdb")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_NOT_THERE, "Object not there") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_NOT_THERE, "Object not there")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_CREATING, "Object is creating")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_DROPPING, "Object is dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_TABLE_TYPE, "Invalid table type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_TABLE_TYPE, "Invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_KEY_TYPE, "Invalid key type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_KEY_TYPE, "Invalid key type")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_ACTION_TYPE, "Invalid action type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_ACTION_TYPE, "Invalid action type")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册