未验证 提交 c7e8bdab 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1510 from taosdata/refactor/cluster

Refactor/cluster
......@@ -96,6 +96,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc);
leftLen -= sizeof(SMsgDesc);
if (pDesc->numOfVnodes > 1) {
pRpcContext = calloc(sizeof(SRpcContext), 1);
pRpcContext->numOfVnodes = pDesc->numOfVnodes;
......
......@@ -59,6 +59,7 @@ typedef struct {
char mnodeName[TSDB_DNODE_NAME_LEN + 1];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
int syncFd;
void *hbTimer;
void *pSync;
......@@ -84,6 +85,7 @@ typedef struct {
char dnodeName[TSDB_DNODE_NAME_LEN + 1];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
SVnodeLoad vload[TSDB_MAX_VNODES];
int32_t status;
uint32_t lastReboot; // time stamp for last reboot
......@@ -102,9 +104,8 @@ typedef struct {
} SVnodeGid;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
} STableInfo;
typedef struct SSuperTableObj {
......@@ -116,6 +117,7 @@ typedef struct SSuperTableObj {
int32_t numOfTags;
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
int32_t numOfTables;
int16_t nextColId;
SSchema * schema;
......@@ -134,6 +136,7 @@ typedef struct {
int8_t reserved[1];
int8_t updateEnd[1];
int16_t nextColId; //used by normal table
int32_t refCount;
char* sql; //used by normal table
SSchema* schema; //used by normal table
SSuperTableObj *superTable;
......@@ -150,6 +153,7 @@ typedef struct _vg_obj {
int8_t lbStatus;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
struct _vg_obj *prev, *next;
struct _db_obj *pDb;
int32_t numOfTables;
......@@ -164,7 +168,7 @@ typedef struct _db_obj {
SDbCfg cfg;
int8_t reserved[15];
int8_t updateEnd[1];
struct _db_obj *prev, *next;
int32_t refCount;
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
......@@ -182,7 +186,7 @@ typedef struct _user_obj {
int8_t writeAuth;
int8_t reserved[13];
int8_t updateEnd[1];
struct _user_obj *prev, *next;
int32_t refCount;
struct _acctObj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
......@@ -215,9 +219,8 @@ typedef struct _acctObj {
int8_t dirty;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
SAcctInfo acctInfo;
SDbObj * pHead;
SUserObj * pUser;
pthread_mutex_t mutex;
} SAcctObj;
......@@ -247,8 +250,12 @@ typedef struct {
void *ahandle;
void *thandle;
void *pCont;
SDbObj *pDb;
SAcctObj *pAcct;
SDnodeObj*pDnode;
SUserObj *pUser;
SDbObj *pDb;
SVgObj *pVgroup;
STableInfo *pTable;
} SQueuedMsg;
int32_t mgmtInitSystem();
......
......@@ -14,6 +14,4 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_LIBRARY(mnode ${SRC})
TARGET_LINK_LIBRARIES(mnode trpc tutil pthread)
ENDIF ()
ENDIF ()
\ No newline at end of file
......@@ -30,12 +30,14 @@ typedef enum {
int32_t acctInit();
void acctCleanUp();
SAcctObj *acctGetAcct(char *acctName);
void acctIncRef(SAcctObj *pAcct);
void acctDecRef(SAcctObj *pAcct);
int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type);
int32_t acctAddDb(SAcctObj *pAcct, SDbObj *pDb);
int32_t acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb);
int32_t acctAddUser(SAcctObj *pAcct, SUserObj *pUser);
int32_t acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser);
void acctAddDb(SAcctObj *pAcct, SDbObj *pDb);
void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb);
void acctAddUser(SAcctObj *pAcct, SUserObj *pUser);
void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_CHILD_TABLE_H
#define TBASE_MNODE_CHILD_TABLE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "taosdef.h"
#include "mnode.h"
int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
void mgmtCreateChildTable(SQueuedMsg *pMsg);
void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
#ifdef __cplusplus
}
#endif
#endif
......@@ -27,6 +27,8 @@ int32_t mgmtInitDbs();
void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db);
void mgmtIncDbRef(SDbObj *pDb);
void mgmtDecDbRef(SDbObj *pDb);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
void mgmtDropAllDbs(SAcctObj *pAcct);
......
......@@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle);
void * mgmtMallocQueuedMsg(SRpcMsg *rpcMsg);
void * mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg);
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
#ifdef __cplusplus
......
......@@ -44,6 +44,7 @@ typedef struct {
char *tableName;
int32_t hashSessions;
int32_t maxRowSize;
int32_t refCountPos;
ESdbKeyType keyType;
int32_t (*insertFp)(SSdbOperDesc *pOper);
int32_t (*deleteFp)(SSdbOperDesc *pOper);
......@@ -62,6 +63,8 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper);
void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
void sdbIncRef(void *thandle, void *pRow);
void sdbDecRef(void *thandle, void *pRow);
int64_t sdbGetNumOfRows(void *handle);
int64_t sdbGetId(void *handle);
uint64_t sdbGetVersion();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_SUPER_TABLE_H
#define TBASE_MNODE_SUPER_TABLE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "taosdef.h"
#include "mnode.h"
int32_t mgmtInitSuperTables();
void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId);
void mgmtCreateSuperTable(SQueuedMsg *pMsg);
void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
#ifdef __cplusplus
}
#endif
#endif
......@@ -28,7 +28,10 @@ extern "C" {
int32_t mgmtInitTables();
void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char* tableId);
void mgmtExtractTableName(char* tableId, char* tableName);
void mgmtIncTableRef(void *pTable);
void mgmtDecTableRef(void *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllSuperTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
......
......@@ -24,6 +24,8 @@ extern "C" {
int32_t mgmtInitUsers();
void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name);
void mgmtIncUserRef(SUserObj *pUser);
void mgmtDecUserRef(SUserObj *pUser);
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
void mgmtDropAllUsers(SAcctObj *pAcct);
......
......@@ -27,9 +27,11 @@ extern "C" {
int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId);
void mgmtIncVgroupRef(SVgObj *pVgroup);
void mgmtDecVgroupRef(SVgObj *pVgroup);
void mgmtDropAllVgroups(SDbObj *pDropDb);
void mgmtCreateVgroup(SQueuedMsg *pMsg);
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
......
......@@ -18,6 +18,8 @@
#include "taoserror.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtUser.h"
#ifndef _ACCOUNT
static SAcctObj tsAcctObj = {0};
......@@ -30,79 +32,31 @@ int32_t acctInit() {
void acctCleanUp() {}
SAcctObj *acctGetAcct(char *acctName) { return &tsAcctObj; }
void acctIncRef(SAcctObj *pAcct) {}
void acctDecRef(SAcctObj *pAcct) {}
int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; }
#endif
int32_t acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex);
pDb->next = pAcct->pHead;
pDb->prev = NULL;
void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
atomic_add_fetch_32(&pAcct->acctInfo.numOfDbs, 1);
pDb->pAcct = pAcct;
if (pAcct->pHead) {
pAcct->pHead->prev = pDb;
}
pAcct->pHead = pDb;
pAcct->acctInfo.numOfDbs++;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
acctIncRef(pAcct);
}
int32_t acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex);
if (pDb->prev) {
pDb->prev->next = pDb->next;
}
if (pDb->next) {
pDb->next->prev = pDb->prev;
}
if (pDb->prev == NULL) {
pAcct->pHead = pDb->next;
}
pAcct->acctInfo.numOfDbs--;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
atomic_sub_fetch_32(&pAcct->acctInfo.numOfDbs, 1);
pDb->pAcct = NULL;
acctIncRef(pAcct);
}
int32_t acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_lock(&pAcct->mutex);
pUser->next = pAcct->pUser;
pUser->prev = NULL;
if (pAcct->pUser) {
pAcct->pUser->prev = pUser;
}
pAcct->pUser = pUser;
pAcct->acctInfo.numOfUsers++;
void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
atomic_add_fetch_32(&pAcct->acctInfo.numOfUsers, 1);
pUser->pAcct = pAcct;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
acctIncRef(pAcct);
}
int32_t acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_lock(&pAcct->mutex);
if (pUser->prev) {
pUser->prev->next = pUser->next;
}
if (pUser->next) {
pUser->next->prev = pUser->prev;
}
if (pUser->prev == NULL) {
pAcct->pUser = pUser->next;
}
pAcct->acctInfo.numOfUsers--;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
atomic_sub_fetch_32(&pAcct->acctInfo.numOfUsers, 1);
pUser->pAcct = NULL;
acctIncRef(pAcct);
}
\ No newline at end of file
此差异已折叠。
......@@ -22,24 +22,22 @@
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtShell.h"
#include "mgmtMnode.h"
#include "mgmtChildTable.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
void * tsDbSdb = NULL;
static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static void mgmtDropDb(void *handle, void *tmrId);
static void mgmtDropDb(SQueuedMsg *newMsg);
static int32_t mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......@@ -58,8 +56,6 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
pDb->pHead = NULL;
pDb->pTail = NULL;
pDb->prev = NULL;
pDb->next = NULL;
pDb->numOfVgroups = 0;
pDb->numOfTables = 0;
pDb->numOfSuperTables = 0;
......@@ -120,6 +116,7 @@ int32_t mgmtInitDbs() {
.tableName = "dbs",
.hashSessions = TSDB_MAX_DBS,
.maxRowSize = tsDbUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtDbActionInsert,
.deleteFp = mgmtDbActionDelete,
......@@ -149,6 +146,14 @@ SDbObj *mgmtGetDb(char *db) {
return (SDbObj *)sdbGetRow(tsDbSdb, db);
}
void mgmtIncDbRef(SDbObj *pDb) {
return sdbIncRef(tsDbSdb, pDb);
}
void mgmtDecDbRef(SDbObj *pDb) {
return sdbDecRef(tsDbSdb, pDb);
}
SDbObj *mgmtGetDbByTableId(char *tableId) {
char db[TSDB_TABLE_ID_LEN], *pos;
......@@ -282,8 +287,9 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
return code;
}
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db);
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb != NULL) {
mgmtDecDbRef(pDb);
return TSDB_CODE_DB_ALREADY_EXIST;
}
......@@ -511,16 +517,14 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs;
pShow->pNode = pUser->pAcct->pHead;
mgmtDecUserRef(pUser);
return 0;
}
static char *mgmtGetDbStr(char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER);
return ++pos;
}
......@@ -533,14 +537,8 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
if (pUser == NULL) return 0;
while (numOfRows < rows) {
pDb = (SDbObj *)pShow->pNode;
pShow->pNode = sdbFetchRow(tsDbSdb, pShow->pNode, (void **) &pDb);
if (pDb == NULL) break;
pShow->pNode = (void *)pDb->next;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) {
continue;
}
}
cols = 0;
......@@ -637,25 +635,32 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols++;
numOfRows++;
mgmtDecDbRef(pDb);
}
pShow->numOfReads += numOfRows;
mgmtDecUserRef(pUser);
return numOfRows;
}
void mgmtAddSuperTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, 1);
mgmtIncDbRef(pDb);
}
void mgmtRemoveSuperTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, -1);
mgmtDecDbRef(pDb);
}
void mgmtAddTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, 1);
mgmtIncDbRef(pDb);
}
void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1);
mgmtDecDbRef(pDb);
}
static int32_t mgmtSetDbDirty(SDbObj *pDb) {
......@@ -766,8 +771,6 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
}
static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMAlterDbMsg *pAlter = pMsg->pCont;
mTrace("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->thandle);
......@@ -777,13 +780,7 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
return;
}
if (!pMsg->pUser->writeAuth) {
mError("db:%s, failed to alter, no rights", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDb(pAlter->db);
SDbObj *pDb = pMsg->pDb = mgmtGetDb(pAlter->db);
if (pDb == NULL) {
mError("db:%s, failed to alter, invalid db", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
......@@ -794,15 +791,13 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, invalid db option", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be altered", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtAlterVgroup(pVgroup, newMsg);
......@@ -810,15 +805,11 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
}
mTrace("db:%s, all vgroups is altered", pDb->name);
mgmtSendSimpleResp(newMsg->thandle, TSDB_CODE_SUCCESS);
rpcFreeCont(newMsg->pCont);
free(newMsg);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
}
static void mgmtDropDb(void *handle, void *tmrId) {
SQueuedMsg *newMsg = handle;
SDbObj *pDb = newMsg->ahandle;
static void mgmtDropDb(SQueuedMsg *pMsg) {
SDbObj *pDb = pMsg->pDb;
mPrint("db:%s, drop db from sdb", pDb->name);
SSdbOperDesc oper = {
......@@ -831,14 +822,10 @@ static void mgmtDropDb(void *handle, void *tmrId) {
code = TSDB_CODE_SDB_ERROR;
}
mgmtSendSimpleResp(newMsg->thandle, code);
rpcFreeCont(newMsg->pCont);
free(newMsg);
mgmtSendSimpleResp(pMsg->thandle, code);
}
static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckRedirect(pMsg->thandle)) return;
SCMDropDbMsg *pDrop = pMsg->pCont;
mTrace("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->thandle);
......@@ -848,13 +835,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return;
}
if (!pMsg->pUser->writeAuth) {
mError("db:%s, failed to drop, no rights", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
SDbObj *pDb = mgmtGetDb(pDrop->db);
SDbObj *pDb = pMsg->pDb = mgmtGetDb(pDrop->db);
if (pDb == NULL) {
if (pDrop->ignoreNotExists) {
mTrace("db:%s, db is not exist, think drop success", pDrop->db);
......@@ -880,13 +861,10 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return;
}
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) {
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtDropVgroup(pVgroup, newMsg);
......@@ -894,10 +872,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
}
mTrace("db:%s, all vgroups is dropped", pDb->name);
void *tmpTmr;
newMsg->ahandle = pDb;
taosTmrReset(mgmtDropDb, 10, newMsg, tsMgmtTmr, &tmpTmr);
mgmtDropDb(pMsg);
}
void mgmtDropAllDbs(SAcctObj *pAcct) {
......@@ -913,6 +888,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
mgmtSetDbDirty(pDb);
numOfDbs++;
}
mgmtDecDbRef(pDb);
}
mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs);
......
......@@ -232,6 +232,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
}
mgmtDecVgroupRef(pVgroup);
}
if (pDnode->status != TSDB_DN_STATUS_READY) {
......
......@@ -16,10 +16,13 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "mgmtDb.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg);
......@@ -763,12 +766,49 @@ int32_t mgmtInitProfile() {
void mgmtCleanUpProfile() {
}
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
bool usePublicIp = false;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
if (pUser == NULL) {
return NULL;
}
SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg));
pMsg->thandle = rpcMsg->handle;
pMsg->msgType = rpcMsg->msgType;
pMsg->contLen = rpcMsg->contLen;
pMsg->pCont = rpcMsg->pCont;
pMsg->pUser = pUser;
pMsg->usePublicIp = usePublicIp;
return pMsg;
}
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
if (pMsg != NULL) {
if (pMsg->pCont != NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
rpcFreeCont(pMsg->pCont);
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
// if (pMsg->pAcct) acctDecRef(pMsg->pAcct);
// if (pMsg->pDnode) mgmtDecTableRef(pMsg->pDnode);
free(pMsg);
}
}
void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg));
pDestMsg->thandle = pSrcMsg->thandle;
pDestMsg->msgType = pSrcMsg->msgType;
pDestMsg->pCont = pSrcMsg->pCont;
pDestMsg->contLen = pSrcMsg->contLen;
pDestMsg->pUser = pSrcMsg->pUser;
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
pSrcMsg->pCont = NULL;
pSrcMsg->pUser = NULL;
return pDestMsg;
}
\ No newline at end of file
......@@ -24,6 +24,7 @@
#include "tutil.h"
#include "hashint.h"
#include "hashstr.h"
#include "mgmtSdb.h"
#define abs(x) (((x) < 0) ? -(x) : (x))
......@@ -46,6 +47,7 @@ typedef struct _SSdbTable {
int32_t tableId;
int32_t hashSessions;
int32_t maxRowSize;
int32_t refCountPos;
int32_t autoIndex;
int32_t fd;
int64_t numOfRows;
......@@ -318,11 +320,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
}
} else {
if (rowHead->version < 0) {
SSdbOperDesc oper = {
.table = pTable,
.pObj = pMetaRow
};
(*pTable->destroyFp)(&oper);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
pTable->numOfRows--;
sdbTrace("table:%s, version:%" PRId64 " numOfRows:%d, read deleted record:%s",
......@@ -338,8 +335,6 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
.rowSize = rowHead->rowSize,
.pObj = pMetaRow
};
(*pTable->destroyFp)(&oper);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
int32_t code = (*pTable->decodeFp)(&oper);
......@@ -373,6 +368,7 @@ static int32_t sdbInitTableByFile(SSdbTable *pTable) {
.version = pMeta->version,
};
sdbIncRef(pTable, oper.pObj);
int32_t code = (*pTable->insertFp)(&oper);
if (code != TSDB_CODE_SUCCESS) {
sdbError("table:%s, failed to insert record:%s", pTable->tableName, sdbGetkeyStr(pTable, rowHead->data));
......@@ -396,6 +392,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable->keyType = pDesc->keyType;
pTable->hashSessions = pDesc->hashSessions;
pTable->maxRowSize = pDesc->maxRowSize;
pTable->refCountPos = pDesc->refCountPos;
pTable->insertFp = pDesc->insertFp;
pTable->deleteFp = pDesc->deleteFp;
pTable->updateFp = pDesc->updateFp;
......@@ -433,6 +430,34 @@ static SRowMeta *sdbGetRowMeta(void *handle, void *key) {
return pMeta;
}
void sdbIncRef(void *handle, void *pRow) {
if (pRow) {
SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1);
if (0) {
sdbTrace("table:%s, add ref to record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
}
}
}
void sdbDecRef(void *handle, void *pRow) {
if (pRow) {
SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
if (0) {
sdbTrace("table:%s, def ref of record:%s:%s:%d", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
}
int8_t* updateEnd = pRow + pTable->refCountPos - 1;
if (refCount <= 0 && *updateEnd) {
sdbTrace("table:%s, record:%s:%s:%d is destroyed", pTable->tableName, pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
SSdbOperDesc oper = {.pObj = pRow};
(*pTable->destroyFp)(&oper);
}
}
}
void *sdbGetRow(void *handle, void *key) {
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta * pMeta;
......@@ -441,6 +466,7 @@ void *sdbGetRow(void *handle, void *key) {
pthread_mutex_lock(&pTable->mutex);
pMeta = (*sdbGetIndexFp[pTable->keyType])(pTable->iHandle, key);
if (pMeta) sdbIncRef(pTable, pMeta->row);
pthread_mutex_unlock(&pTable->mutex);
if (pMeta == NULL) {
......@@ -459,6 +485,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
if (sdbGetRow(pTable, pOper->pObj)) {
sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetkeyStr(pTable, pOper->pObj));
sdbDecRef(pTable, pOper->pObj);
return TSDB_CODE_ALREADY_THERE;
}
......@@ -526,6 +553,7 @@ int32_t sdbInsertRow(SSdbOperDesc *pOper) {
rowMeta.rowSize = pOper->rowSize;
rowMeta.row = pOper->pObj;
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, pOper->pObj, &rowMeta);
sdbIncRef(pTable, pOper->pObj);
pTable->numOfRows++;
......@@ -626,8 +654,9 @@ int32_t sdbDeleteRow(SSdbOperDesc *pOper) {
pthread_mutex_unlock(&pTable->mutex);
(*pTable->deleteFp)(pOper);
(*pTable->destroyFp)(pOper);
int8_t* updateEnd = pOper->pObj + pTable->refCountPos - 1;
*updateEnd = 1;
sdbDecRef(pTable, pOper->pObj);
return 0;
}
......@@ -762,6 +791,7 @@ void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
if (pMeta == NULL) return NULL;
*ppRow = pMeta->row;
sdbIncRef(handle, pMeta->row);
return pNode;
}
......@@ -25,7 +25,6 @@
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
......@@ -33,7 +32,6 @@
#include "mgmtProfile.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
......@@ -121,8 +119,7 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) {
void mgmtProcessTranRequest(SSchedMsg *sched) {
SQueuedMsg *queuedMsg = sched->msg;
(*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg);
rpcFreeCont(queuedMsg->pCont);
free(queuedMsg);
mgmtFreeQueuedMsg(queuedMsg);
}
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
......@@ -137,6 +134,12 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return;
}
if (mgmtCheckRedirect(rpcMsg->handle)) {
// send resp in redirect func
rpcFreeCont(rpcMsg->pCont);
return;
}
if (!mgmtInServerStatus()) {
mgmtProcessMsgWhileNotReady(rpcMsg);
rpcFreeCont(rpcMsg->pCont);
......@@ -145,6 +148,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED);
rpcFreeCont(rpcMsg->pCont);
return;
}
......@@ -154,44 +158,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return;
}
bool usePublicIp = false;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
if (pUser == NULL) {
SQueuedMsg *pMsg = mgmtMallocQueuedMsg(rpcMsg);
if (pMsg == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
SQueuedMsg queuedMsg = {0};
queuedMsg.thandle = rpcMsg->handle;
queuedMsg.msgType = rpcMsg->msgType;
queuedMsg.contLen = rpcMsg->contLen;
queuedMsg.pCont = rpcMsg->pCont;
queuedMsg.pUser = pUser;
queuedMsg.usePublicIp = usePublicIp;
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg);
rpcFreeCont(rpcMsg->pCont);
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg);
mgmtFreeQueuedMsg(pMsg);
} else {
SQueuedMsg *queuedMsg = calloc(1, sizeof(SQueuedMsg));
queuedMsg->thandle = rpcMsg->handle;
queuedMsg->msgType = rpcMsg->msgType;
queuedMsg->contLen = rpcMsg->contLen;
queuedMsg->pCont = rpcMsg->pCont;
queuedMsg->pUser = pUser;
queuedMsg->usePublicIp = usePublicIp;
mgmtAddToShellQueue(queuedMsg);
if (!pMsg->pUser->writeAuth) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
mgmtFreeQueuedMsg(pMsg);
} else {
mgmtAddToShellQueue(pMsg);
}
}
}
static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
SCMShowMsg *pShowMsg = pMsg->pCont;
if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) {
if (mgmtCheckRedirect(pMsg->thandle)) {
return;
}
}
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE);
return;
......@@ -358,9 +346,11 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
SUserObj *pUser = mgmtGetUser(user);
if (pUser == NULL) {
*secret = 0;
mgmtDecUserRef(pUser);
return TSDB_CODE_INVALID_USER;
} else {
memcpy(secret, pUser->pass, TSDB_KEY_LEN);
mgmtDecUserRef(pUser);
return TSDB_CODE_SUCCESS;
}
}
......@@ -376,28 +366,19 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
}
int32_t code;
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
code = TSDB_CODE_INVALID_USER;
goto connect_over;
}
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_GRANT_EXPIRED;
goto connect_over;
}
SAcctObj *pAcct = acctGetAcct(pUser->acct);
if (pAcct == NULL) {
code = TSDB_CODE_INVALID_ACCT;
goto connect_over;
}
code = taosCheckVersion(pConnectMsg->clientVersion, version, 3);
if (code != TSDB_CODE_SUCCESS) {
goto connect_over;
}
SUserObj *pUser = pMsg->pUser;
SAcctObj *pAcct = pUser->pAcct;
if (pConnectMsg->db[0]) {
char dbName[TSDB_TABLE_ID_LEN * 3] = {0};
sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
......@@ -419,7 +400,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth;
if (connInfo.serverIp == tsPublicIpInt) {
if (pMsg->usePublicIp) {
mgmtGetMnodePublicIpList(&pConnectRsp->ipList);
} else {
mgmtGetMnodePrivateIpList(&pConnectRsp->ipList);
......@@ -468,6 +449,7 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
mTrace("table:%s auto created task added", pInfo->tableId);
}
mgmtDecTableRef(pTable);
return addIntoTranQueue;
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#################################
run general/user/testSuite.sim
run general/table/testSuite.sim
run general/user/basic1.sim
run general/db/basic1.sim
run general/table/basic1.sim
##################################
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create database
sql create database d1
sql show databases
if $rows != 1 then
return -1
endi
if $data00 != d1 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0 then
return -1
endi
print =============== drop database
sql drop database d1
sql show databases
if $rows != 0 then
return -1
endi
print =============== more databases
sql create database d2
sql create database d3
sql create database d4
sql show databases
if $rows != 3 then
return -1
endi
print =============== drop database
sql drop database d2
sql drop database d3
sql show databases
if $rows != 1 then
return -1
endi
if $data00 != d4 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0 then
return -1
endi
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册