提交 c64976c2 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

first draft

上级 de7bbc0d
...@@ -184,6 +184,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -184,6 +184,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
if (pSql->res.code != TSDB_CODE_SUCCESS) { if (pSql->res.code != TSDB_CODE_SUCCESS) {
terrno = pSql->res.code; terrno = pSql->res.code;
if (pSql->res.code == TSDB_CODE_MND_INIT)
printf("mnode %s\n", pSql->res.pRsp);
taos_free_result(pSql); taos_free_result(pSql);
taos_close(pObj); taos_close(pObj);
return NULL; return NULL;
...@@ -650,11 +652,15 @@ char *taos_errstr(TAOS_RES *tres) { ...@@ -650,11 +652,15 @@ char *taos_errstr(TAOS_RES *tres) {
return (char*) tstrerror(terrno); return (char*) tstrerror(terrno);
} }
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) { if (!hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) {
return pSql->cmd.payload; if (pSql->res.code == TSDB_CODE_MND_INIT) {
} else { sprintf(pSql->cmd.payload, "%s:%s", tstrerror(pSql->res.code), pSql->res.pRsp);
return (char*)tstrerror(pSql->res.code); } else {
sprintf(pSql->cmd.payload, "%s", tstrerror(pSql->res.code));
}
} }
return pSql->cmd.payload;
} }
void taos_config(int debug, char *log_path) { void taos_config(int debug, char *log_path) {
......
...@@ -122,11 +122,16 @@ void dnodeFreeMPeerQueue() { ...@@ -122,11 +122,16 @@ void dnodeFreeMPeerQueue() {
} }
void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) { void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMPeerQueue == NULL) { if (!mnodeIsRunning()) {
dnodeSendRedirectMsg(pMsg, false); dnodeSendRedirectMsg(pMsg, false);
} else { } else {
SMnodeMsg *pPeer = mnodeCreateMsg(pMsg); if (!mnodeIsReady()) {
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_MND_INIT, .pCont = tsMgmtInitStr, .contLen = sizeof(tsMgmtInitStr)};
rpcSendResponse(&rpcRsp);
} else {
SMnodeMsg *pPeer = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer);
}
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -123,11 +123,16 @@ void dnodeFreeMReadQueue() { ...@@ -123,11 +123,16 @@ void dnodeFreeMReadQueue() {
} }
void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMReadQueue == NULL) { if (!mnodeIsRunning()) {
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, false);
} else { } else {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg); if (!mnodeIsReady()) {
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_MND_INIT, .pCont = tsMgmtInitStr, .contLen = sizeof(tsMgmtInitStr)};
rpcSendResponse(&rpcRsp);
} else {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
}
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
......
...@@ -123,13 +123,18 @@ void dnodeFreeMWritequeue() { ...@@ -123,13 +123,18 @@ void dnodeFreeMWritequeue() {
} }
void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) { void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMWriteQueue == NULL) { if (!mnodeIsRunning()) {
dnodeSendRedirectMsg(pMsg, true); dnodeSendRedirectMsg(pMsg, false);
} else { } else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg); if (!mnodeIsReady()) {
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle, SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_MND_INIT, .pCont = tsMgmtInitStr, .contLen = sizeof(tsMgmtInitStr)};
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue); rpcSendResponse(&rpcRsp);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -186,12 +191,8 @@ static void *dnodeProcessMWriteQueue(void *param) { ...@@ -186,12 +191,8 @@ static void *dnodeProcessMWriteQueue(void *param) {
void dnodeReprocessMWriteMsg(void *pMsg) { void dnodeReprocessMWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg; SMnodeMsg *pWrite = pMsg;
if (!mnodeIsRunning()) {
if (!mnodeIsRunning() || tsMWriteQueue == NULL) { dnodeSendRedirectMsg(pMsg, false);
dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMWriteMsg(pWrite); dnodeFreeMWriteMsg(pWrite);
} else { } else {
dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle, dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
......
...@@ -61,6 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = { ...@@ -61,6 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"dnodeeps", dnodeInitEps, dnodeCleanupEps}, {"dnodeeps", dnodeInitEps, dnodeCleanupEps},
{"globalcfg" ,taosCheckGlobalCfg, NULL}, {"globalcfg" ,taosCheckGlobalCfg, NULL},
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"shell", dnodeInitShell, dnodeCleanupShell},
{"wal", walInit, walCleanUp}, {"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVRead, dnodeCleanupVRead}, {"vread", dnodeInitVRead, dnodeCleanupVRead},
...@@ -73,7 +74,6 @@ static const SDnodeComponent tsDnodeComponents[] = { ...@@ -73,7 +74,6 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
{"modules", dnodeInitModules, dnodeCleanupModules}, {"modules", dnodeInitModules, dnodeCleanupModules},
{"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer}, {"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer},
{"shell", dnodeInitShell, dnodeCleanupShell},
{"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, {"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
}; };
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *); static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey, char *reason);
static void * tsShellRpc = NULL; static void * tsShellRpc = NULL;
static int32_t tsQueryReqNum = 0; static int32_t tsQueryReqNum = 0;
static int32_t tsSubmitReqNum = 0; static int32_t tsSubmitReqNum = 0;
...@@ -142,10 +142,13 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -142,10 +142,13 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
} }
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey, char *reason) {
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); *reason = 0;
if (code != TSDB_CODE_APP_NOT_READY) return code;
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey, reason);
if (code != TSDB_CODE_RPC_REDIRECT) return code;
// send the auth request to other mnodes
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user)); tstrncpy(pMsg->user, user, sizeof(pMsg->user));
...@@ -160,6 +163,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -160,6 +163,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
if (rpcRsp.contLen > 0) strncpy(reason, rpcRsp.pCont, TSDB_REASON_LEN);
} else { } else {
SAuthRsp *pRsp = rpcRsp.pCont; SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user); dDebug("user:%s, auth msg received from mnodes", user);
......
...@@ -65,11 +65,14 @@ void mnodeStopSystem(); ...@@ -65,11 +65,14 @@ void mnodeStopSystem();
void sdbUpdateAsync(); void sdbUpdateAsync();
void sdbUpdateSync(void *pMnodes); void sdbUpdateSync(void *pMnodes);
bool mnodeIsRunning(); bool mnodeIsRunning();
bool mnodeIsReady();
int32_t mnodeProcessRead(SMnodeMsg *pMsg); int32_t mnodeProcessRead(SMnodeMsg *pMsg);
int32_t mnodeProcessWrite(SMnodeMsg *pMsg); int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg); int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg);
void mnodeProcessPeerRsp(SRpcMsg *pMsg); void mnodeProcessPeerRsp(SRpcMsg *pMsg);
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey, char *reason);
extern char tsMgmtInitStr[TSDB_REASON_LEN];
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -272,6 +272,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -272,6 +272,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_LOCALE_LEN 64 #define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 96 #define TSDB_TIMEZONE_LEN 96
#define TSDB_LABEL_LEN 8 #define TSDB_LABEL_LEN 8
#define TSDB_REASON_LEN 80
#define TSDB_CLUSTER_ID_LEN 40 #define TSDB_CLUSTER_ID_LEN 40
#define TSDB_FQDN_LEN 128 #define TSDB_FQDN_LEN 128
......
...@@ -125,6 +125,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, 0, 0x030B, "Data expir ...@@ -125,6 +125,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, 0, 0x030B, "Data expir
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "Invalid query id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "Invalid query id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "Invalid stream id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "Invalid stream id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "Invalid connection id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "Invalid connection id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_RUNNING, 0, 0x030F, "Mnode is not running")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_MASTER, 0, 0x0310, "Mnode is not master")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT, 0, 0x0311, "Mnode is initializing")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "Object already there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "Object already there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "Unexpected generic error in sdb") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "Unexpected generic error in sdb")
......
...@@ -836,6 +836,7 @@ typedef struct { ...@@ -836,6 +836,7 @@ typedef struct {
char encrypt; char encrypt;
char secret[TSDB_KEY_LEN]; char secret[TSDB_KEY_LEN];
char ckey[TSDB_KEY_LEN]; char ckey[TSDB_KEY_LEN];
char reason[TSDB_REASON_LEN];
} SAuthMsg, SAuthRsp; } SAuthMsg, SAuthRsp;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -75,7 +75,7 @@ typedef struct SRpcInit { ...@@ -75,7 +75,7 @@ typedef struct SRpcInit {
void (*cfp)(SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey, char *reason);
} SRpcInit; } SRpcInit;
int32_t rpcInit(); int32_t rpcInit();
......
...@@ -45,6 +45,7 @@ extern int32_t sdbDebugFlag; ...@@ -45,6 +45,7 @@ extern int32_t sdbDebugFlag;
#define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } #define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLInfo(...) { monitorSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) } #define mLInfo(...) { monitorSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -37,14 +37,23 @@ ...@@ -37,14 +37,23 @@
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeProfile.h" #include "mnodeProfile.h"
typedef struct { typedef struct {
const char *const name; const char *const name;
int (*init)(); int (*init)();
void (*cleanup)(); void (*cleanup)();
} SMnodeComponent; } SMnodeComponent;
void *tsMnodeTmr = NULL; typedef enum {
static bool tsMgmtIsRunning = false; TSDB_MND_STATUS_NOT_RUNNING,
TSDB_MND_STATUS_INIT,
TSDB_MND_STATUS_READY,
TSDB_MND_STATUS_CLEANING,
} EMndStatus;
void *tsMnodeTmr = NULL;
static EMndStatus tsMgmtStatus = TSDB_MND_STATUS_NOT_RUNNING;
char tsMgmtInitStr[TSDB_REASON_LEN];
static const SMnodeComponent tsMnodeComponents[] = { static const SMnodeComponent tsMnodeComponents[] = {
{"profile", mnodeInitProfile, mnodeCleanupProfile}, {"profile", mnodeInitProfile, mnodeCleanupProfile},
...@@ -75,37 +84,44 @@ static void mnodeCleanupComponents(int32_t stepId) { ...@@ -75,37 +84,44 @@ static void mnodeCleanupComponents(int32_t stepId) {
static int32_t mnodeInitComponents() { static int32_t mnodeInitComponents() {
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) { for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) {
snprintf(tsMgmtInitStr, sizeof(tsMgmtInitStr), "start to init %s", tsMnodeComponents[i].name);
if (tsMnodeComponents[i].init() != 0) { if (tsMnodeComponents[i].init() != 0) {
mnodeCleanupComponents(i); mnodeCleanupComponents(i);
code = -1; code = -1;
break; break;
} }
sleep(3);
} }
return code; return code;
} }
int32_t mnodeStartSystem() { int32_t mnodeStartSystem() {
if (tsMgmtIsRunning) { if (tsMgmtStatus != TSDB_MND_STATUS_NOT_RUNNING) {
mInfo("mnode module already started..."); mInfo("mnode module already started...");
return 0; return 0;
} }
tsMgmtStatus = TSDB_MND_STATUS_INIT;
mInfo("starting to initialize mnode ..."); mInfo("starting to initialize mnode ...");
if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) { if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) {
mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno)); mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno));
return -1; return -1;
} }
sleep(3);
snprintf(tsMgmtInitStr, sizeof(tsMgmtInitStr), "start to init queues");
dnodeAllocMWritequeue(); dnodeAllocMWritequeue();
dnodeAllocMReadQueue(); dnodeAllocMReadQueue();
dnodeAllocateMPeerQueue(); dnodeAllocateMPeerQueue();
sleep(3);
if (mnodeInitComponents() != 0) { if (mnodeInitComponents() != 0) {
return -1; return -1;
} }
snprintf(tsMgmtInitStr, sizeof(tsMgmtInitStr), "mnode is ready");
grantReset(TSDB_GRANT_ALL, 0); grantReset(TSDB_GRANT_ALL, 0);
tsMgmtIsRunning = true; tsMgmtStatus = TSDB_MND_STATUS_READY;
mInfo("mnode is initialized successfully"); mInfo("mnode is initialized successfully");
...@@ -123,9 +139,9 @@ int32_t mnodeInitSystem() { ...@@ -123,9 +139,9 @@ int32_t mnodeInitSystem() {
} }
void mnodeCleanupSystem() { void mnodeCleanupSystem() {
if (tsMgmtIsRunning) { if (tsMgmtStatus != TSDB_MND_STATUS_READY) {
mInfo("starting to clean up mnode"); mInfo("starting to clean up mnode");
tsMgmtIsRunning = false; tsMgmtStatus = TSDB_MND_STATUS_CLEANING;
dnodeFreeMWritequeue(); dnodeFreeMWritequeue();
dnodeFreeMReadQueue(); dnodeFreeMReadQueue();
...@@ -134,6 +150,7 @@ void mnodeCleanupSystem() { ...@@ -134,6 +150,7 @@ void mnodeCleanupSystem() {
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mInfo("mnode is cleaned up"); mInfo("mnode is cleaned up");
tsMgmtStatus = TSDB_MND_STATUS_NOT_RUNNING;
} }
} }
...@@ -183,5 +200,10 @@ static bool mnodeNeedStart() { ...@@ -183,5 +200,10 @@ static bool mnodeNeedStart() {
} }
bool mnodeIsRunning() { bool mnodeIsRunning() {
return tsMgmtIsRunning; return (tsMgmtStatus == TSDB_MND_STATUS_READY || tsMgmtStatus == TSDB_MND_STATUS_INIT);
} }
bool mnodeIsReady() {
return (tsMgmtStatus == TSDB_MND_STATUS_READY);
}
...@@ -296,7 +296,7 @@ void sdbUpdateAsync() { ...@@ -296,7 +296,7 @@ void sdbUpdateAsync() {
void sdbUpdateSync(void *pMnodes) { void sdbUpdateSync(void *pMnodes) {
SMnodeInfos *mnodes = pMnodes; SMnodeInfos *mnodes = pMnodes;
if (!mnodeIsRunning()) { if (!mnodeIsReady()) {
mDebug("vgId:1, mnode not start yet, update sync config later"); mDebug("vgId:1, mnode not start yet, update sync config later");
return; return;
} }
......
...@@ -582,16 +582,27 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { ...@@ -582,16 +582,27 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
mDebug("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers); mDebug("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers);
} }
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey, char *reason) {
*secret = 0; *reason = 0;
if (!mnodeIsRunning()) {
mDebug("user:%s, failed to auth user, mnode is not running", user);
return TSDB_CODE_RPC_REDIRECT;
}
if (!mnodeIsReady()) {
mDebug("user:%s, failed to auth user, mnode is not ready", user);
strcpy(reason, tsMgmtInitStr);
return TSDB_CODE_MND_INIT;
}
if (!sdbIsMaster()) { if (!sdbIsMaster()) {
*secret = 0;
mDebug("user:%s, failed to auth user, mnode is not master", user); mDebug("user:%s, failed to auth user, mnode is not master", user);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_RPC_REDIRECT;
} }
SUserObj *pUser = mnodeGetUser(user); SUserObj *pUser = mnodeGetUser(user);
if (pUser == NULL) { if (pUser == NULL) {
*secret = 0;
mError("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_MND_INVALID_USER)); mError("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_MND_INVALID_USER));
return TSDB_CODE_MND_INVALID_USER; return TSDB_CODE_MND_INVALID_USER;
} else { } else {
...@@ -613,5 +624,5 @@ static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg) { ...@@ -613,5 +624,5 @@ static int32_t mnodeProcessAuthMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pAuthRsp; pMsg->rpcRsp.rsp = pAuthRsp;
pMsg->rpcRsp.len = sizeof(SAuthRsp); pMsg->rpcRsp.len = sizeof(SAuthRsp);
return mnodeRetriveAuth(pAuthMsg->user, &pAuthRsp->spi, &pAuthRsp->encrypt, pAuthRsp->secret, pAuthRsp->ckey); return mnodeRetriveAuth(pAuthMsg->user, &pAuthRsp->spi, &pAuthRsp->encrypt, pAuthRsp->secret, pAuthRsp->ckey, pAuthRsp->reason);
} }
...@@ -34,6 +34,7 @@ typedef struct { ...@@ -34,6 +34,7 @@ typedef struct {
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
int connType; int connType;
char reason[TSDB_REASON_LEN];
void *shandle; void *shandle;
void *thandle; void *thandle;
void *chandle; void *chandle;
......
...@@ -56,7 +56,7 @@ typedef struct { ...@@ -56,7 +56,7 @@ typedef struct {
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey, char *reason);
int32_t refCount; int32_t refCount;
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
...@@ -726,7 +726,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -726,7 +726,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pConn->user[0] == 0) { if (pConn->user[0] == 0) {
terrno = TSDB_CODE_RPC_AUTH_REQUIRED; terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
} else { } else {
terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey, pRecv->reason);
} }
if (terrno != 0) { if (terrno != 0) {
...@@ -1053,6 +1053,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { ...@@ -1053,6 +1053,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
return NULL; return NULL;
} }
pRecv->reason[0] = 0;
terrno = 0; terrno = 0;
SRpcReqContext *pContext; SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
...@@ -1218,7 +1219,7 @@ static void rpcSendReqHead(SRpcConn *pConn) { ...@@ -1218,7 +1219,7 @@ static void rpcSendReqHead(SRpcConn *pConn) {
static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
SRpcHead *pRecvHead, *pReplyHead; SRpcHead *pRecvHead, *pReplyHead;
char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) ]; char msg[sizeof(SRpcHead) + sizeof(SRpcDigest) + sizeof(uint32_t) + TSDB_REASON_LEN];
uint32_t timeStamp; uint32_t timeStamp;
int msgLen; int msgLen;
...@@ -1247,6 +1248,11 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { ...@@ -1247,6 +1248,11 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) {
msgLen += sizeof(timeStamp); msgLen += sizeof(timeStamp);
} }
if (code == TSDB_CODE_MND_INIT) {
strncpy((char *)pReplyHead->content, pRecv->reason, TSDB_REASON_LEN);
msgLen += TSDB_REASON_LEN;
}
pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pReplyHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
(*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle); (*taosSendData[pRecv->connType])(pRecv->ip, pRecv->port, msg, msgLen, pRecv->chandle);
...@@ -1551,7 +1557,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { ...@@ -1551,7 +1557,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
if ( !rpcIsReq(pHead->msgType) ) { if ( !rpcIsReq(pHead->msgType) ) {
// for response, if code is auth failure, it shall bypass the auth process // for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code); code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_MND_INIT ||
code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) { code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
......
...@@ -83,10 +83,11 @@ void processShellMsg() { ...@@ -83,10 +83,11 @@ void processShellMsg() {
} }
int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey, char *reason) {
// app shall retrieve the auth info based on meterID from DB or a data file // app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo // demo code here only for simple demo
int ret = 0; int ret = 0;
*reason = 0;
if (strcmp(meterId, "michael") == 0) { if (strcmp(meterId, "michael") == 0) {
*spi = 1; *spi = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册