提交 622cdc39 编写于 作者: S Shengliang Guan

add transaction interface

上级 79519165
......@@ -399,8 +399,6 @@ typedef struct {
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
int8_t privilege;
int8_t flag;
} SCreateUserMsg, SAlterUserMsg;
typedef struct {
......
......@@ -93,7 +93,7 @@ typedef enum {
typedef enum { SDB_ACTION_INSERT = 1, SDB_ACTION_UPDATE = 2, SDB_ACTION_DELETE = 3 } ESdbAction;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY, SDB_STATUS_DROPPING, SDB_STATUS_DROPPED } ESdbStatus;
typedef enum { SDB_STATUS_CREATING = 1, SDB_STATUS_READY = 2, SDB_STATUS_DROPPING = 3 } ESdbStatus;
typedef struct {
int8_t type;
......@@ -136,7 +136,7 @@ int32_t sdbDeploy();
void sdbUnDeploy();
void *sdbAcquire(ESdbType sdb, void *pKey);
void sdbRelease(ESdbType sdb, void *pObj);
void sdbRelease(void *pObj);
void *sdbFetch(ESdbType sdb, void *pIter);
void sdbCancelFetch(ESdbType sdb, void *pIter);
int32_t sdbGetSize(ESdbType sdb);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TRANSACTION_H_
#define _TD_TRANSACTION_H_
#include "sdb.h"
#include "taosmsg.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
} STrans;
int32_t trnInit();
void trnCleanup();
STrans *trnCreate();
int32_t trnCommit(STrans *);
void trnDrop(STrans *);
void trnAppendRedoLog(STrans *, SSdbRawData *);
void trnAppendUndoLog(STrans *, SSdbRawData *);
void trnAppendCommitLog(STrans *, SSdbRawData *);
void trnAppendRedoAction(STrans *, SEpSet *, void *pMsg);
void trnAppendUndoAction(STrans *, SEpSet *, void *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_TRANSACTION_H_*/
......@@ -8,6 +8,7 @@ target_include_directories(
target_link_libraries(
mnode
PRIVATE sdb
PRIVATE transaction
PUBLIC transport
PUBLIC cjson
)
\ No newline at end of file
......@@ -24,7 +24,6 @@
#include "thash.h"
#include "cJSON.h"
#include "mnode.h"
#include "sdb.h"
#ifdef __cplusplus
extern "C" {
......@@ -268,6 +267,7 @@ typedef struct {
typedef struct SMnMsg {
void (*fp)(SMnMsg *pMsg, int32_t code);
char user[TSDB_USER_LEN];
SUserObj *pUser;
int16_t received;
int16_t successed;
......
......@@ -17,6 +17,8 @@
#define _TD_MNODE_INT_H_
#include "mnodeDef.h"
#include "sdb.h"
#include "trn.h"
#ifdef __cplusplus
extern "C" {
......
......@@ -95,7 +95,7 @@ static int32_t mnodeUserActionDelete(SUserObj *pUser) {
}
if (pUser->acct != NULL) {
sdbRelease(SDB_ACCT, pUser->pAcct);
sdbRelease(pUser->pAcct);
pUser->pAcct = NULL;
}
......@@ -115,7 +115,7 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) {
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.rootAuth = 1;
......@@ -144,6 +144,98 @@ static int32_t mnodeCreateDefaultUsers() {
return code;
}
static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg) {
int32_t code = 0;
STrans *pTrans = NULL;
SSdbRawData *pUndoRaw = NULL;
SSdbRawData *pRedoRaw = NULL;
SUserObj userObj = {0};
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.rootAuth = 0;
pRedoRaw = mnodeUserActionEncode(&userObj);
if (pRedoRaw == NULL) {
code = terrno;
goto CREATE_USER_OVER;
}
pRedoRaw->status = SDB_STATUS_READY;
pRedoRaw->action = SDB_ACTION_INSERT;
pUndoRaw = mnodeUserActionEncode(&userObj);
if (pUndoRaw == NULL) {
code = terrno;
goto CREATE_USER_OVER;
}
pUndoRaw->status = SDB_STATUS_DROPPING;
pUndoRaw->action = SDB_ACTION_DELETE;
pTrans = trnCreate();
if (pTrans == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto CREATE_USER_OVER;
}
trnAppendRedoLog(pTrans, pRedoRaw);
trnAppendUndoLog(pTrans, pUndoRaw);
code = trnCommit(pTrans);
CREATE_USER_OVER:
if (code != 0) {
trnDrop(pTrans);
free(pRedoRaw);
free(pUndoRaw);
}
return code;
}
static int32_t mnodeProcessCreateUserMsg(SMnMsg *pMsg) {
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
int32_t code = TSDB_CODE_SUCCESS;
if (pCreate->user[0] == 0) {
code = TSDB_CODE_MND_INVALID_USER_FORMAT;
mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code));
return code;
}
if (pCreate->pass[0] == 0) {
code = TSDB_CODE_MND_INVALID_PASS_FORMAT;
mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code));
return code;
}
SUserObj *pUser = sdbAcquire(SDB_USER, pCreate->user);
if (pUser != NULL) {
sdbRelease(pUser);
code = TSDB_CODE_MND_USER_ALREADY_EXIST;
mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code));
return code;
}
SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->user);
if (pOperUser == NULL) {
code = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code));
return code;
}
code = mnodeCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg);
sdbRelease(pOperUser);
if (code != 0) {
mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code));
return code;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
int32_t mnodeInitUser() {
SSdbDesc desc = {.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
......
......@@ -10,5 +10,4 @@ target_link_libraries(
PRIVATE os
PRIVATE common
PRIVATE util
PRIVATE cjson
)
\ No newline at end of file
......@@ -377,7 +377,7 @@ void *sdbAcquire(ESdbType sdb, void *pKey) {
}
}
void sdbRelease(ESdbType sdb, void *pObj) {
void sdbRelease(void *pObj) {
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
atomic_sub_fetch_32(&pRow->refCount, 1);
}
......
......@@ -7,5 +7,8 @@ target_include_directories(
)
target_link_libraries(
transaction
PUBLIC transport
)
\ No newline at end of file
PRIVATE os
PRIVATE common
PRIVATE util
PRIVATE sdb
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TRANSACTION_INT_H_
#define _TD_TRANSACTION_INT_H_
#include "os.h"
#include "trn.h"
#include "tglobal.h"
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus
}
#endif
#endif /*_TD_TRANSACTION_INT_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "trnInt.h"
int32_t trnInit() { return 0; }
void trnCleanup();
STrans *trnCreate() { return NULL; }
int32_t trnCommit(STrans *pTrans) { return 0; }
void trnDrop(STrans *pTrans) {}
void trnAppendRedoLog(STrans *pTrans, SSdbRawData *pRaw) {}
void trnAppendUndoLog(STrans *pTrans, SSdbRawData *pRaw) {}
void trnAppendCommitLog(STrans *pTrans, SSdbRawData *pRaw) {}
void trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {}
void trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册