提交 f26cf444 编写于 作者: S Shengliang Guan

refact sdb

上级 4910ea29
...@@ -20,58 +20,84 @@ ...@@ -20,58 +20,84 @@
extern "C" { extern "C" {
#endif #endif
#define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \ #define SDB_GET_INT64(pData, pRow, dataPos, val) \
if (code == 0) { \ { \
if ((dataLen) >= (valLen)) { \ if (sdbGetRawInt64(pRaw, dataPos, val) != 0) { \
memcpy((val), (char *)(pData), (valLen)); \ sdbFreeRow(pRow); \
(dataLen) -= (valLen); \ return NULL; \
(pData) = (char *)(pData) + (valLen); \ } \
} else { \ dataPos += sizeof(int64_t); \
code = TSDB_CODE_SDB_INVALID_DATA_LEN; \
} \
} }
#define SDB_GET_INT32_VAL(pData, dataLen, val, code) \ #define SDB_GET_INT32(pData, pRow, dataPos, val) \
if (code == 0) { \ { \
if (dataLen >= sizeof(int32_t)) { \ if (sdbGetRawInt32(pRaw, dataPos, val) != 0) { \
*(int32_t *)(pData) = (int32_t)(val); \ sdbFreeRow(pRow); \
(dataLen) -= sizeof(int32_t); \ return NULL; \
(pData) = (char *)(pData) + sizeof(int32_t); \ } \
} else { \ dataPos += sizeof(int32_t); \
code = TSDB_CODE_SDB_INVALID_DATA_LEN; \
} \
} }
#define SDB_GET_INT64_VAL(pData, dataLen, val, code) \ #define SDB_GET_INT8(pData, pRow, dataPos, val) \
if (code == 0) { \ { \
if (dataLen >= sizeof(int64_t)) { \ if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \
*(int64_t *)(pData) = (int64_t)(val); \ sdbFreeRow(pRow); \
(dataLen) -= sizeof(int64_t); \ return NULL; \
(pData) = (char *)(pData) + sizeof(int64_t); \ } \
} else { \ dataPos += sizeof(int8_t); \
code = TSDB_CODE_SDB_INVALID_DATA_LEN; \
} \
} }
#define SDB_SET_INT64_VAL(pData, dataLen, val) \ #define SDB_GET_BINARY(pRaw, pRow, dataPos, val, valLen) \
{ \ { \
*(int64_t *)(pData) = (int64_t)(val); \ if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
(dataLen) += sizeof(int64_t); \ sdbFreeRow(pRow); \
(pData) += sizeof(int64_t); \ return NULL; \
} \
dataPos += valLen; \
} }
#define SDB_SET_INT32_VAL(pData, dataLen, val) \ #define SDB_SET_INT64(pData, dataPos, val) \
{ \ { \
*(int32_t *)(pData) = (int32_t)(val); \ if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \
(dataLen) += sizeof(int32_t); \ sdbFreeRaw(pRaw); \
(pData) += sizeof(int32_t); \ return NULL; \
}; \
dataPos += sizeof(int64_t); \
} }
#define SDB_SET_BINARY_VAL(pData, dataLen, val, valLen) \ #define SDB_SET_INT32(pData, dataPos, val) \
{ \ { \
memcpy((char *)(pData), (val), (valLen)); \ if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \
(dataLen) += (valLen); \ sdbFreeRaw(pRaw); \
(pData) += (valLen); \ return NULL; \
}; \
dataPos += sizeof(int32_t); \
}
#define SDB_SET_INT8(pData, dataPos, val) \
{ \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
dataPos += sizeof(int8_t); \
}
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \
{ \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
dataPos += valLen; \
}
#define SDB_SET_DATALEN(pRaw, dataLen) \
{ \
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
}; \
} }
typedef enum { typedef enum {
...@@ -93,23 +119,14 @@ typedef enum { ...@@ -93,23 +119,14 @@ typedef enum {
typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction; typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, SDB_STATUS_DROPPING = 3 } ESdbStatus; typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, SDB_STATUS_DROPPING = 3 } ESdbStatus;
typedef struct SSdbRaw SSdbRaw;
typedef struct { typedef struct SSdbRow SSdbRow;
int8_t type;
int8_t sver;
int8_t status;
int8_t action;
int8_t reserved[4];
int32_t cksum;
int32_t dataLen;
char data[];
} SSdbRaw;
typedef int32_t (*SdbInsertFp)(void *pObj); typedef int32_t (*SdbInsertFp)(void *pObj);
typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(void *pObj); typedef int32_t (*SdbDeleteFp)(void *pObj);
typedef int32_t (*SdbDeployFp)(); typedef int32_t (*SdbDeployFp)();
typedef void *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef struct { typedef struct {
...@@ -140,6 +157,26 @@ void *sdbFetch(ESdbType sdb, void *pIter); ...@@ -140,6 +157,26 @@ void *sdbFetch(ESdbType sdb, void *pIter);
void sdbCancelFetch(ESdbType sdb, void *pIter); void sdbCancelFetch(ESdbType sdb, void *pIter);
int32_t sdbGetSize(ESdbType sdb); int32_t sdbGetSize(ESdbType sdb);
SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw);
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val);
int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val);
int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen);
int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen);
int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status);
int32_t sdbSetRawAction(SSdbRaw *pRaw, ESdbAction action);
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val);
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val);
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val);
int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen);
int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t sdbGetRawTotalSize(SSdbRaw *pRaw);
SSdbRow *sdbAllocRow(int32_t objSize);
void sdbFreeRow(SSdbRow *pRow);
void *sdbGetRowObj(SSdbRow *pRow);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -133,7 +133,6 @@ int32_t* taosGetErrno(); ...@@ -133,7 +133,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir") #define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components") #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components")
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) #define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321) #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321)
#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322) #define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0322)
...@@ -143,7 +142,7 @@ int32_t* taosGetErrno(); ...@@ -143,7 +142,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326) #define TSDB_CODE_SDB_INVALID_STATUS_TYPE TAOS_DEF_ERROR_CODE(0, 0x0326)
#define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0327) #define TSDB_CODE_SDB_INVALID_DATA_VER TAOS_DEF_ERROR_CODE(0, 0x0327)
#define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0328) #define TSDB_CODE_SDB_INVALID_DATA_LEN TAOS_DEF_ERROR_CODE(0, 0x0328)
#define TSDB_CODE_SDB_INVALID_META_ROW TAOS_DEF_ERROR_CODE(0, 0x0329) #define TSDB_CODE_SDB_INVALID_DATA_CONTENT TAOS_DEF_ERROR_CODE(0, 0x0329)
#define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists") #define TSDB_CODE_MND_DNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0330) //"DNode already exists")
#define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist") #define TSDB_CODE_MND_DNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0331) //"DNode does not exist")
......
...@@ -16,69 +16,56 @@ ...@@ -16,69 +16,56 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mnodeInt.h" #include "mnodeInt.h"
#define ACCT_VER 1 #define SDB_ACCT_VER 1
static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) { static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) {
SSdbRaw *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRaw)); SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, SDB_ACCT_VER, sizeof(SAcctObj));
if (pRaw == NULL) { if (pRaw == NULL) return NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; int32_t dataPos = 0;
} SDB_SET_BINARY(pRaw, dataPos, pAcct->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pAcct->createdTime)
SDB_SET_INT64(pRaw, dataPos, pAcct->updateTime)
SDB_SET_INT32(pRaw, dataPos, pAcct->acctId)
SDB_SET_INT32(pRaw, dataPos, pAcct->status)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxUsers)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxDbs)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTimeSeries)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxStreams)
SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState)
SDB_SET_DATALEN(pRaw, dataPos);
int32_t dataLen = 0;
char *pData = pRaw->data;
SDB_SET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN)
SDB_SET_INT64_VAL(pData, dataLen, pAcct->createdTime)
SDB_SET_INT64_VAL(pData, dataLen, pAcct->updateTime)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->acctId)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->status)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams)
SDB_SET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage)
SDB_SET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState)
pRaw->dataLen = dataLen;
pRaw->type = SDB_ACCT;
pRaw->sver = ACCT_VER;
return pRaw; return pRaw;
} }
static SAcctObj *mnodeAcctActionDecode(SSdbRaw *pRaw) { static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) {
if (pRaw->sver != ACCT_VER) { int8_t sver = 0;
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
return NULL;
}
SAcctObj *pAcct = calloc(1, sizeof(SAcctObj)); if (sver != SDB_ACCT_VER) {
if (pAcct == NULL) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t code = 0;
int32_t dataLen = pRaw->dataLen;
char *pData = pRaw->data;
SDB_GET_BINARY_VAL(pData, dataLen, pAcct->acct, TSDB_USER_LEN, code)
SDB_GET_INT64_VAL(pData, dataLen, pAcct->createdTime, code)
SDB_GET_INT64_VAL(pData, dataLen, pAcct->updateTime, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->acctId, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->status, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxUsers, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxDbs, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxTimeSeries, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.maxStreams, code)
SDB_GET_INT64_VAL(pData, dataLen, pAcct->cfg.maxStorage, code)
SDB_GET_INT32_VAL(pData, dataLen, pAcct->cfg.accessState, code)
if (code != 0) {
tfree(pAcct);
terrno = code;
return NULL; return NULL;
} }
return pAcct; SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj));
SAcctObj *pAcct = sdbGetRowObj(pRow);
if (pAcct == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pAcct->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->updateTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->acctId)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->status)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxUsers)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxDbs)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxTimeSeries)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.maxStreams)
SDB_GET_INT64(pRaw, pRow, dataPos, &pAcct->cfg.maxStorage)
SDB_GET_INT32(pRaw, pRow, dataPos, &pAcct->cfg.accessState)
return pRow;
} }
static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; } static int32_t mnodeAcctActionInsert(SAcctObj *pAcct) { return 0; }
...@@ -106,9 +93,7 @@ static int32_t mnodeCreateDefaultAcct() { ...@@ -106,9 +93,7 @@ static int32_t mnodeCreateDefaultAcct() {
.accessState = TSDB_VN_ALL_ACCCESS}; .accessState = TSDB_VN_ALL_ACCCESS};
SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj); SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj);
if (pRaw == NULL) { if (pRaw == NULL) return -1;
return -1;
}
return sdbWrite(pRaw); return sdbWrite(pRaw);
} }
......
...@@ -21,8 +21,8 @@ int32_t mnodeInitSync() { return 0; } ...@@ -21,8 +21,8 @@ int32_t mnodeInitSync() { return 0; }
void mnodeCleanUpSync() {} void mnodeCleanUpSync() {}
int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) { int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) {
trnApply(pRaw, pData, 0); trnApply(pData, pData, 0);
free(pRaw); free(pData);
return 0; return 0;
} }
......
...@@ -19,59 +19,46 @@ ...@@ -19,59 +19,46 @@
#include "tglobal.h" #include "tglobal.h"
#include "tkey.h" #include "tkey.h"
#define USER_VER 1 #define SDB_USER_VER 1
static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) {
SSdbRaw *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRaw)); SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj));
if (pRaw == NULL) { if (pRaw == NULL) return NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; int32_t dataPos = 0;
} SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_KEY_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth)
SDB_SET_DATALEN(pRaw, dataPos);
int32_t dataLen = 0;
char *pData = pRaw->data;
SDB_SET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN)
SDB_SET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN)
SDB_SET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_KEY_LEN)
SDB_SET_INT64_VAL(pData, dataLen, pUser->createdTime)
SDB_SET_INT64_VAL(pData, dataLen, pUser->updateTime)
SDB_SET_INT32_VAL(pData, dataLen, pUser->rootAuth)
pRaw->dataLen = dataLen;
pRaw->type = SDB_USER;
pRaw->sver = USER_VER;
return pRaw; return pRaw;
} }
static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) { static SSdbRow *mnodeUserActionDecode(SSdbRaw *pRaw) {
if (pRaw->sver != USER_VER) { int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_USER_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL; return NULL;
} }
SUserObj *pUser = calloc(1, sizeof(SUserObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj));
if (pUser == NULL) { SUserObj *pUser = sdbGetRowObj(pRow);
terrno = TSDB_CODE_OUT_OF_MEMORY; if (pUser == NULL) return NULL;
return NULL;
}
int32_t code = 0; int32_t dataPos = 0;
int32_t dataLen = pRaw->dataLen; SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN)
char *pData = pRaw->data; SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_KEY_LEN)
SDB_GET_BINARY_VAL(pData, dataLen, pUser->user, TSDB_USER_LEN, code) SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_BINARY_VAL(pData, dataLen, pUser->pass, TSDB_KEY_LEN, code) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_BINARY_VAL(pData, dataLen, pUser->acct, TSDB_USER_LEN, code) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT64_VAL(pData, dataLen, pUser->createdTime, code) SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->rootAuth)
SDB_GET_INT64_VAL(pData, dataLen, pUser->updateTime, code)
SDB_GET_INT32_VAL(pData, dataLen, pUser->rootAuth, code)
if (code != 0) { return pRow;
tfree(pUser);
terrno = code;
return NULL;
}
return pUser;
} }
static int32_t mnodeUserActionInsert(SUserObj *pUser) { static int32_t mnodeUserActionInsert(SUserObj *pUser) {
...@@ -122,9 +109,7 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { ...@@ -122,9 +109,7 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
} }
SSdbRaw *pRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pRaw = mnodeUserActionEncode(&userObj);
if (pRaw == NULL) { if (pRaw == NULL) return -1;
return -1;
}
return sdbWrite(pRaw); return sdbWrite(pRaw);
} }
...@@ -156,32 +141,31 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM ...@@ -156,32 +141,31 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK); STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK);
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle);
SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj);
if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) {
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
pRedoRaw->status = SDB_STATUS_CREATING; sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
pRedoRaw->action = SDB_ACTION_INSERT; sdbSetRawAction(pRedoRaw, SDB_ACTION_INSERT);
SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj);
if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) {
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
pUndoRaw->status = SDB_STATUS_DROPPING; sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPING);
pUndoRaw->action = SDB_ACTION_DELETE; sdbSetRawAction(pUndoRaw, SDB_ACTION_DELETE);
SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj);
if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) {
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
pCommitRaw->status = SDB_STATUS_READY; sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
pCommitRaw->action = SDB_ACTION_UPDATE; sdbSetRawAction(pCommitRaw, SDB_ACTION_UPDATE);
trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle);
if (trnPrepare(pTrans, mnodeSyncPropose) != 0) { if (trnPrepare(pTrans, mnodeSyncPropose) != 0) {
trnDrop(pTrans); trnDrop(pTrans);
......
...@@ -36,6 +36,23 @@ extern "C" { ...@@ -36,6 +36,23 @@ extern "C" {
#define SDB_MAX_SIZE (32 * 1024) #define SDB_MAX_SIZE (32 * 1024)
typedef struct SSdbRaw {
int8_t sdb;
int8_t sver;
int8_t status;
int8_t action;
int8_t reserved[4];
int32_t cksum;
int32_t dataLen;
char pData[];
} SSdbRaw;
typedef struct SSdbRow {
ESdbStatus status;
int32_t refCount;
char pObj[];
} SSdbRow;
typedef struct { typedef struct {
char *currDir; char *currDir;
char *syncDir; char *syncDir;
...@@ -53,12 +70,7 @@ typedef struct { ...@@ -53,12 +70,7 @@ typedef struct {
SdbDecodeFp decodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX];
} SSdbMgr; } SSdbMgr;
typedef struct { extern SSdbMgr tsSdb;
ESdbStatus status;
int32_t refCount;
int32_t dataLen;
char pData[];
} SSdbRow;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,224 +17,7 @@ ...@@ -17,224 +17,7 @@
#include "sdbInt.h" #include "sdbInt.h"
#include "tglobal.h" #include "tglobal.h"
static SSdbMgr tsSdb = {0}; SSdbMgr tsSdb = {0};
static int32_t sdbCreateDir() {
if (!taosMkDir(tsSdb.currDir)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr());
return -1;
}
if (!taosMkDir(tsSdb.syncDir)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr());
return -1;
}
if (!taosMkDir(tsSdb.tmpDir)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr());
return -1;
}
return 0;
}
static int32_t sdbRunDeployFp() {
for (int32_t i = SDB_START; i < SDB_MAX; ++i) {
SdbDeployFp fp = tsSdb.deployFps[i];
if (fp == NULL) continue;
if ((*fp)() != 0) {
mError("failed to deploy sdb:%d since %s", i, terrstr());
return -1;
}
}
return 0;
}
static SHashObj *sdbGetHash(int32_t sdb) {
if (sdb >= SDB_MAX || sdb <= SDB_START) {
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
return NULL;
}
SHashObj *hash = tsSdb.hashObjs[sdb];
if (hash == NULL) {
terrno = TSDB_CODE_SDB_APP_ERROR;
return NULL;
}
return hash;
}
int32_t sdbWrite(SSdbRaw *pRaw) {
SHashObj *hash = sdbGetHash(pRaw->type);
switch (pRaw->action) {
case SDB_ACTION_INSERT:
break;
case SDB_ACTION_UPDATE:
break;
case SDB_ACTION_DELETE:
break;
default:
break;
}
return 0;
}
static int32_t sdbWriteVersion(FileFd fd) { return 0; }
static int32_t sdbReadVersion(FileFd fd) { return 0; }
static int32_t sdbReadDataFile() {
int32_t code = 0;
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
if (pRaw == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
FileFd fd = taosOpenFileCreateWrite(file);
if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for read since %s", file, tstrerror(code));
return code;
}
int64_t offset = 0;
while (1) {
int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw));
if (ret == 0) break;
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code));
break;
}
if (ret < sizeof(SSdbRaw)) {
code = TSDB_CODE_SDB_APP_ERROR;
mError("failed to read file:%s since %s", file, tstrerror(code));
break;
}
code = sdbWrite(pRaw);
if (code != 0) {
mError("failed to read file:%s since %s", file, tstrerror(code));
goto PARSE_SDB_DATA_ERROR;
}
}
code = 0;
PARSE_SDB_DATA_ERROR:
taosCloseFile(fd);
return code;
}
static int32_t sdbWriteDataFile() {
int32_t code = 0;
char tmpfile[PATH_MAX] = {0};
snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir);
FileFd fd = taosOpenFileCreateWrite(tmpfile);
if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code));
return code;
}
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
SHashObj *hash = tsSdb.hashObjs[i];
if (!hash) continue;
SdbEncodeFp encodeFp = tsSdb.encodeFps[i];
if (!encodeFp) continue;
SSdbRow *pRow = taosHashIterate(hash, NULL);
while (pRow != NULL) {
if (pRow->status == SDB_STATUS_READY) continue;
SSdbRaw *pRaw = (*encodeFp)(pRow->pData);
if (pRaw != NULL) {
taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
} else {
taosHashCancelIterate(hash, pRow);
code = TSDB_CODE_SDB_APP_ERROR;
break;
}
pRow = taosHashIterate(hash, pRow);
}
}
if (code == 0) {
code = sdbWriteVersion(fd);
}
taosCloseFile(fd);
if (code == 0) {
code = taosFsyncFile(fd);
}
if (code != 0) {
char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
code = taosRenameFile(tmpfile, curfile);
}
if (code != 0) {
mError("failed to write sdb file since %s", tstrerror(code));
} else {
mInfo("write sdb file successfully");
}
return code;
}
int32_t sdbRead() {
int32_t code = sdbReadDataFile();
if (code != 0) {
return code;
}
mInfo("read sdb file successfully");
return -1;
}
int32_t sdbCommit() {
int32_t code = sdbWriteDataFile();
if (code != 0) {
return code;
}
return 0;
}
int32_t sdbDeploy() {
if (sdbCreateDir() != 0) {
return -1;
}
if (sdbRunDeployFp() != 0) {
return -1;
}
if (sdbCommit() != 0) {
return -1;
}
return 0;
}
void sdbUnDeploy() {}
int32_t sdbInit() { int32_t sdbInit() {
char path[PATH_MAX + 100]; char path[PATH_MAX + 100];
...@@ -309,108 +92,4 @@ void sdbSetTable(SSdbTable table) { ...@@ -309,108 +92,4 @@ void sdbSetTable(SSdbTable table) {
tsSdb.deployFps[sdb] = table.deployFp; tsSdb.deployFps[sdb] = table.deployFp;
tsSdb.encodeFps[sdb] = table.encodeFp; tsSdb.encodeFps[sdb] = table.encodeFp;
tsSdb.decodeFps[sdb] = table.decodeFp; tsSdb.decodeFps[sdb] = table.decodeFp;
}
#if 0
void *sdbInsertRow(ESdbType sdb, void *p) {
SdbHead *pHead = p;
pHead->type = sdb;
pHead->status = SDB_AVAIL;
char *pKey = (char *)pHead + sizeof(pHead);
int32_t keySize;
EKeyType keyType = tsSdb.keyTypes[pHead->type];
int32_t dataSize = tsSdb.dataSize[pHead->type];
SHashObj *hash = sdbGetHash(pHead->type);
if (hash == NULL) {
return NULL;
}
if (keyType == SDBINT32) {
keySize = sizeof(int32_t);
} else if (keyType == SDB_KEY_BINARY) {
keySize = strlen(pKey) + 1;
} else {
keySize = sizeof(int64_t);
}
taosHashPut(hash, pKey, keySize, pHead, dataSize);
return taosHashGet(hash, pKey, keySize);
}
void sdbDeleteRow(ESdbType sdb, void *p) {
SdbHead *pHead = p;
pHead->status = SDB_STATUS_DROPPED;
}
void *sdbUpdateRow(ESdbType sdb, void *pHead) { return sdbInsertRow(sdb, pHead); }
#endif
void *sdbAcquire(ESdbType sdb, void *pKey) {
terrno = 0;
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return NULL;
}
int32_t keySize;
EKeyType keyType = tsSdb.keyTypes[sdb];
switch (keyType) {
case SDB_KEY_INT32:
keySize = sizeof(int32_t);
break;
case SDB_KEY_INT64:
keySize = sizeof(int64_t);
break;
case SDB_KEY_BINARY:
keySize = strlen(pKey) + 1;
break;
default:
keySize = sizeof(int32_t);
}
SSdbRow *pRow = taosHashGet(hash, pKey, keySize);
if (pRow == NULL) return NULL;
if (pRow->status == SDB_STATUS_READY) {
atomic_add_fetch_32(&pRow->refCount, 1);
return pRow->pData;
} else {
terrno = -1; // todo
return NULL;
}
}
void sdbRelease(void *pObj) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
atomic_sub_fetch_32(&pRow->refCount, 1);
}
void *sdbFetchRow(ESdbType sdb, void *pIter) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return NULL;
}
return taosHashIterate(hash, pIter);
}
void sdbCancelFetch(ESdbType sdb, void *pIter) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return;
}
taosHashCancelIterate(hash, pIter);
}
int32_t sdbGetSize(ESdbType sdb) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return 0;
}
return taosHashGetSize(hash);
} }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
#include "tglobal.h"
static int32_t sdbCreateDir() {
if (!taosMkDir(tsSdb.currDir)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.currDir, terrstr());
return -1;
}
if (!taosMkDir(tsSdb.syncDir)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.syncDir, terrstr());
return -1;
}
if (!taosMkDir(tsSdb.tmpDir)) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", tsSdb.tmpDir, terrstr());
return -1;
}
return 0;
}
static int32_t sdbRunDeployFp() {
for (int32_t i = SDB_START; i < SDB_MAX; ++i) {
SdbDeployFp fp = tsSdb.deployFps[i];
if (fp == NULL) continue;
if ((*fp)() != 0) {
mError("failed to deploy sdb:%d since %s", i, terrstr());
return -1;
}
}
return 0;
}
static int32_t sdbWriteVersion(FileFd fd) { return 0; }
static int32_t sdbReadVersion(FileFd fd) { return 0; }
static int32_t sdbReadDataFile() {
int32_t code = 0;
SSdbRaw *pRaw = malloc(SDB_MAX_SIZE);
if (pRaw == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%ssdb.data", tsSdb.currDir);
FileFd fd = taosOpenFileCreateWrite(file);
if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for read since %s", file, tstrerror(code));
return code;
}
int64_t offset = 0;
while (1) {
int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw));
if (ret == 0) break;
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code));
break;
}
if (ret < sizeof(SSdbRaw)) {
code = TSDB_CODE_SDB_APP_ERROR;
mError("failed to read file:%s since %s", file, tstrerror(code));
break;
}
code = sdbWrite(pRaw);
if (code != 0) {
mError("failed to read file:%s since %s", file, tstrerror(code));
goto PARSE_SDB_DATA_ERROR;
}
}
code = 0;
PARSE_SDB_DATA_ERROR:
taosCloseFile(fd);
return code;
}
static int32_t sdbWriteDataFile() {
int32_t code = 0;
char tmpfile[PATH_MAX] = {0};
snprintf(tmpfile, sizeof(tmpfile), "%ssdb.data", tsSdb.tmpDir);
FileFd fd = taosOpenFileCreateWrite(tmpfile);
if (fd <= 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, tstrerror(code));
return code;
}
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
SHashObj *hash = tsSdb.hashObjs[i];
if (!hash) continue;
SdbEncodeFp encodeFp = tsSdb.encodeFps[i];
if (!encodeFp) continue;
SSdbRow *pRow = taosHashIterate(hash, NULL);
while (pRow != NULL) {
if (pRow->status == SDB_STATUS_READY) continue;
SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
if (pRaw != NULL) {
taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen);
} else {
taosHashCancelIterate(hash, pRow);
code = TSDB_CODE_SDB_APP_ERROR;
break;
}
pRow = taosHashIterate(hash, pRow);
}
}
if (code == 0) {
code = sdbWriteVersion(fd);
}
taosCloseFile(fd);
if (code == 0) {
code = taosFsyncFile(fd);
}
if (code != 0) {
char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%ssdb.data", tsSdb.currDir);
code = taosRenameFile(tmpfile, curfile);
}
if (code != 0) {
mError("failed to write sdb file since %s", tstrerror(code));
} else {
mInfo("write sdb file successfully");
}
return code;
}
int32_t sdbRead() {
int32_t code = sdbReadDataFile();
if (code != 0) {
return code;
}
mInfo("read sdb file successfully");
return -1;
}
int32_t sdbCommit() {
int32_t code = sdbWriteDataFile();
if (code != 0) {
return code;
}
return 0;
}
int32_t sdbDeploy() {
if (sdbCreateDir() != 0) {
return -1;
}
if (sdbRunDeployFp() != 0) {
return -1;
}
if (sdbCommit() != 0) {
return -1;
}
return 0;
}
void sdbUnDeploy() {}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
#include "tglobal.h"
static SHashObj *sdbGetHash(int32_t sdb) {
if (sdb >= SDB_MAX || sdb <= SDB_START) {
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
return NULL;
}
SHashObj *hash = tsSdb.hashObjs[sdb];
if (hash == NULL) {
terrno = TSDB_CODE_SDB_APP_ERROR;
return NULL;
}
return hash;
}
static int32_t sdbInsertRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SSdbRow *pDstRow = NULL;
if (pDstRow != NULL) {
terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE;
return -1;
}
pRow->refCount = 0;
pRow->status = pRaw->status;
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SdbInsertFp insertFp = tsSdb.insertFps[pRaw->sdb];
if (insertFp != NULL) {
if ((*insertFp)(pRow->pObj) != 0) {
taosHashRemove(hash, pRow->pObj, keySize);
return -1;
}
}
return 0;
}
static int32_t sdbUpdateRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SSdbRow *pDstRow = NULL;
if (pDstRow == NULL) {
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return -1;
}
SdbUpdateFp updateFp = tsSdb.updateFps[pRaw->sdb];
if (updateFp != NULL) {
if ((*updateFp)(pRow->pObj, pDstRow->pObj) != 0) {
return -1;
}
}
return 0;
}
static int32_t sdbDeleteRow(SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) {
SSdbRow *pDstRow = NULL;
if (pDstRow == NULL) {
terrno = TSDB_CODE_SDB_OBJ_NOT_THERE;
return -1;
}
SdbDeleteFp deleteFp = tsSdb.deleteFps[pRaw->sdb];
if (deleteFp != NULL) {
if ((*deleteFp)(pRow->pObj) != 0) {
return -1;
}
}
taosHashRemove(hash, pRow->pObj, keySize);
return 0;
}
int32_t sdbWrite(SSdbRaw *pRaw) {
SHashObj *hash = sdbGetHash(pRaw->sdb);
if (hash == NULL) return -1;
SdbDecodeFp decodeFp = tsSdb.decodeFps[pRaw->sdb];
SSdbRow *pRow = (*decodeFp)(pRaw);
if (pRow == NULL) {
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
return -1;
}
int32_t keySize;
EKeyType keyType = tsSdb.keyTypes[pRaw->sdb];
if (keyType == SDB_KEY_INT32) {
keySize = sizeof(int32_t);
} else if (keyType == SDB_KEY_BINARY) {
keySize = strlen(pRow->pObj) + 1;
} else {
keySize = sizeof(int64_t);
}
int32_t code = -1;
if (pRaw->action == SDB_ACTION_INSERT) {
code = sdbInsertRow(hash, pRaw, pRow, keySize);
} else if (pRaw->action == SDB_ACTION_UPDATE) {
code = sdbUpdateRow(hash, pRaw, pRow, keySize);
} else if (pRaw->action == SDB_ACTION_DELETE) {
code = sdbDeleteRow(hash, pRaw, pRow, keySize);
} else {
terrno = TSDB_CODE_SDB_INVALID_ACTION_TYPE;
}
if (code != 0) {
sdbFreeRow(pRow);
}
return 0;
}
void *sdbAcquire(ESdbType sdb, void *pKey) {
terrno = 0;
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return NULL;
}
int32_t keySize;
EKeyType keyType = tsSdb.keyTypes[sdb];
switch (keyType) {
case SDB_KEY_INT32:
keySize = sizeof(int32_t);
break;
case SDB_KEY_INT64:
keySize = sizeof(int64_t);
break;
case SDB_KEY_BINARY:
keySize = strlen(pKey) + 1;
break;
default:
keySize = sizeof(int32_t);
}
SSdbRow *pRow = taosHashGet(hash, pKey, keySize);
if (pRow == NULL) return NULL;
if (pRow->status == SDB_STATUS_READY) {
atomic_add_fetch_32(&pRow->refCount, 1);
return pRow->pObj;
} else {
terrno = -1; // todo
return NULL;
}
}
void sdbRelease(void *pObj) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
atomic_sub_fetch_32(&pRow->refCount, 1);
}
void *sdbFetchRow(ESdbType sdb, void *pIter) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return NULL;
}
return taosHashIterate(hash, pIter);
}
void sdbCancelFetch(ESdbType sdb, void *pIter) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return;
}
taosHashCancelIterate(hash, pIter);
}
int32_t sdbGetSize(ESdbType sdb) {
SHashObj *hash = sdbGetHash(sdb);
if (hash == NULL) {
return 0;
}
return taosHashGetSize(hash);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
SSdbRaw *sdbAllocRaw(ESdbType sdb, int8_t sver, int32_t dataLen) {
SSdbRaw *pRaw = calloc(1, dataLen + sizeof(SSdbRaw));
if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRaw->sdb = sdb;
pRaw->sver = sver;
pRaw->dataLen = dataLen;
return pRaw;
}
void sdbFreeRaw(SSdbRaw *pRaw) { free(pRaw); }
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int8_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*(int8_t *)(pRaw->pData + dataPos) = val;
return 0;
}
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int32_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*(int32_t *)(pRaw->pData + dataPos) = val;
return 0;
}
int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int64_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*(int64_t *)(pRaw->pData + dataPos) = val;
return 0;
}
int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + valLen > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
memcpy(pRaw->pData + dataPos, pVal, valLen);
return 0;
}
int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataLen > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
pRaw->dataLen = dataLen;
return 0;
}
int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
pRaw->status = status;
return 0;
}
int32_t sdbSetRawAction(SSdbRaw *pRaw, ESdbAction action) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
pRaw->action = action;
return 0;
}
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int8_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*val = *(int8_t *)(pRaw->pData + dataPos);
return 0;
}
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int32_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*val = *(int32_t *)(pRaw->pData + dataPos);
return 0;
}
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + sizeof(int64_t) > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
*val = *(int64_t *)(pRaw->pData + dataPos);
return 0;
}
int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
if (dataPos + valLen > pRaw->dataLen) {
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
return -1;
}
memcpy(pVal, pRaw->pData + dataPos, valLen);
return 0;
}
int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
*sver = pRaw->sver;
return 0;
}
int32_t sdbGetRawTotalSize(SSdbRaw *pRaw) {
if (pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
return sizeof(SSdbRaw) + pRaw->dataLen;
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbInt.h"
SSdbRow *sdbAllocRow(int32_t objSize) {
SSdbRow *pRow = calloc(1, objSize + sizeof(SSdbRow));
if (pRow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return pRow;
}
void *sdbGetRowObj(SSdbRow *pRow) {
if (pRow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
return pRow->pObj;
}
void sdbFreeRow(SSdbRow *pRow) { free(pRow); }
...@@ -45,15 +45,15 @@ typedef enum { ...@@ -45,15 +45,15 @@ typedef enum {
} ETrnStage; } ETrnStage;
typedef struct STrans { typedef struct STrans {
int32_t id; int32_t id;
ETrnStage stage; int8_t stage;
ETrnPolicy policy; int8_t policy;
void *rpcHandle; void *rpcHandle;
SArray *redoLogs; SArray *redoLogs;
SArray *undoLogs; SArray *undoLogs;
SArray *commitLogs; SArray *commitLogs;
SArray *redoActions; SArray *redoActions;
SArray *undoActions; SArray *undoActions;
} STrans; } STrans;
SSdbRaw *trnActionEncode(STrans *pTrans); SSdbRaw *trnActionEncode(STrans *pTrans);
......
...@@ -16,8 +16,10 @@ ...@@ -16,8 +16,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "trnInt.h" #include "trnInt.h"
#define SDB_TRANS_VER 1
SSdbRaw *trnActionEncode(STrans *pTrans) { SSdbRaw *trnActionEncode(STrans *pTrans) {
int32_t rawDataLen = 5 * sizeof(int32_t); int32_t rawDataLen = 10 * sizeof(int32_t);
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
...@@ -25,91 +27,84 @@ SSdbRaw *trnActionEncode(STrans *pTrans) { ...@@ -25,91 +27,84 @@ SSdbRaw *trnActionEncode(STrans *pTrans) {
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
for (int32_t index = 0; index < redoLogNum; ++index) { for (int32_t index = 0; index < redoLogNum; ++index) {
SSdbRaw *pRawData = taosArrayGet(pTrans->redoLogs, index); SSdbRaw *pRaw = taosArrayGet(pTrans->redoLogs, index);
rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); rawDataLen += sdbGetRawTotalSize(pRaw);
} }
for (int32_t index = 0; index < undoLogNum; ++index) { for (int32_t index = 0; index < undoLogNum; ++index) {
SSdbRaw *pRawData = taosArrayGet(pTrans->undoLogs, index); SSdbRaw *pRaw = taosArrayGet(pTrans->undoLogs, index);
rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); rawDataLen += sdbGetRawTotalSize(pRaw);
} }
for (int32_t index = 0; index < commitLogNum; ++index) { for (int32_t index = 0; index < commitLogNum; ++index) {
SSdbRaw *pRawData = taosArrayGet(pTrans->commitLogs, index); SSdbRaw *pRaw = taosArrayGet(pTrans->commitLogs, index);
rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); rawDataLen += sdbGetRawTotalSize(pRaw);
} }
SSdbRaw *pRaw = calloc(1, rawDataLen + sizeof(SSdbRaw)); SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen);
if (pRaw == NULL) { if (pRaw == NULL) return NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; int32_t dataPos = 0;
} SDB_SET_INT32(pData, dataPos, pTrans->id)
SDB_SET_INT8(pData, dataPos, pTrans->stage)
SDB_SET_INT8(pData, dataPos, pTrans->policy)
SDB_SET_INT32(pData, dataPos, redoLogNum)
SDB_SET_INT32(pData, dataPos, undoLogNum)
SDB_SET_INT32(pData, dataPos, commitLogNum)
SDB_SET_INT32(pData, dataPos, redoActionNum)
SDB_SET_INT32(pData, dataPos, undoActionNum)
SDB_SET_DATALEN(pRaw, dataPos);
int32_t dataLen = 0;
char *pData = pRaw->data;
SDB_SET_INT32_VAL(pData, dataLen, redoLogNum)
SDB_SET_INT32_VAL(pData, dataLen, undoLogNum)
SDB_SET_INT32_VAL(pData, dataLen, commitLogNum)
SDB_SET_INT32_VAL(pData, dataLen, redoActionNum)
SDB_SET_INT32_VAL(pData, dataLen, undoActionNum)
pRaw->dataLen = dataLen;
pRaw->type = SDB_TRANS;
pRaw->sver = TRN_VER;
return pRaw; return pRaw;
} }
STrans *trnActionDecode(SSdbRaw *pRaw) { STrans *trnActionDecode(SSdbRaw *pRaw) {
if (pRaw->sver != TRN_VER) { int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_TRANS_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL; return NULL;
} }
STrans *pTrans = NULL; SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
if (pTrans == NULL) { STrans *pTrans = sdbGetRowObj(pRow);
terrno = TSDB_CODE_OUT_OF_MEMORY; if (pTrans == NULL) return NULL;
return NULL;
}
int32_t redoLogNum = 0; int32_t redoLogNum = 0;
int32_t undoLogNum = 0; int32_t undoLogNum = 0;
int32_t commitLogNum = 0; int32_t commitLogNum = 0;
int32_t redoActionNum = 0; int32_t redoActionNum = 0;
int32_t undoActionNum = 0; int32_t undoActionNum = 0;
SSdbRaw *pTmp = malloc(sizeof(SSdbRaw));
int32_t code = 0; int32_t dataPos = 0;
int32_t dataLen = pRaw->dataLen; SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
char *pData = pRaw->data; SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->stage)
SDB_GET_INT32_VAL(pData, dataLen, redoLogNum, code) SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->policy)
SDB_GET_INT32_VAL(pData, dataLen, undoLogNum, code) SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
SDB_GET_INT32_VAL(pData, dataLen, commitLogNum, code) SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
SDB_GET_INT32_VAL(pData, dataLen, redoActionNum, code) SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
SDB_GET_INT32_VAL(pData, dataLen, undoActionNum, code) SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)
for (int32_t index = 0; index < redoLogNum; ++index) { for (int32_t index = 0; index < redoLogNum; ++index) {
SDB_GET_BINARY_VAL(pData, dataLen, pTmp, sizeof(SSdbRaw), code); int32_t dataLen = 0;
if (code == 0 && pTmp->dataLen > 0) { SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
SSdbRaw *pRead = malloc(sizeof(SSdbRaw) + pTmp->dataLen);
if (pRead == NULL) { char *pData = malloc(dataLen);
code = TSDB_CODE_OUT_OF_MEMORY; SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
break; void *ret = taosArrayPush(pTrans->redoLogs, pData);
} if (ret == NULL) {
memcpy(pRead, pTmp, sizeof(SSdbRaw)); terrno = TSDB_CODE_OUT_OF_MEMORY;
SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); break;
void *ret = taosArrayPush(pTrans->redoLogs, &pRead);
if (ret == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
} }
} }
if (code != 0) { // if (code != 0) {
trnDrop(pTrans); // trnDrop(pTrans);
terrno = code; // terrno = code;
return NULL; // return NULL;
} // }
return pTrans; return pTrans;
} }
......
...@@ -52,7 +52,7 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) { ...@@ -52,7 +52,7 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
return 0; return 0;
} }
if (sdbWrite(pRaw) != 0) { if (sdbWrite(pData) != 0) {
code = terrno; code = terrno;
trnSendRpcRsp(pData, code); trnSendRpcRsp(pData, code);
terrno = code; terrno = code;
......
...@@ -153,7 +153,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_ACTION_TYPE, "Invalid action type") ...@@ -153,7 +153,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_ACTION_TYPE, "Invalid action type")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_STATUS_TYPE, "Invalid status type")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_VER, "Invalid raw data version")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_LEN, "Invalid raw data len")
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_META_ROW, "Invalid meta row") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_INVALID_DATA_CONTENT, "Invalid raw data content")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "DNode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ALREADY_EXIST, "DNode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "DNode does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_EXIST, "DNode does not exist")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册