未验证 提交 a49528de 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #1420 from taosdata/refact/slguan

[TD-15] fix error while create child table
...@@ -1746,7 +1746,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1746,7 +1746,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_META; pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -98,67 +98,45 @@ typedef struct { ...@@ -98,67 +98,45 @@ typedef struct {
} SVnodeGid; } SVnodeGid;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
int8_t type; int8_t type;
int8_t dirty; int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
} STableInfo; } STableInfo;
struct _vg_obj; struct _vg_obj;
typedef struct SSuperTableObj { typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1]; STableInfo info;
int8_t type; uint64_t uid;
int8_t dirty; int64_t createdTime;
uint64_t uid; int32_t sversion;
int32_t sid; int32_t numOfColumns;
int32_t vgId; int32_t numOfTags;
int64_t createdTime; int8_t reserved[15];
int32_t sversion; int8_t updateEnd[1];
int32_t numOfColumns; int32_t numOfTables;
int32_t numOfTags; int16_t nextColId;
int8_t reserved[15]; SSchema * schema;
int8_t updateEnd[1];
int32_t numOfTables;
int16_t nextColId;
SSchema *schema;
} SSuperTableObj; } SSuperTableObj;
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; STableInfo info;
int8_t type; uint64_t uid;
int8_t dirty; int64_t createdTime;
uint64_t uid; int32_t sversion; //used by normal table
int32_t sid; int32_t numOfColumns; //used by normal table
int32_t vgId; int32_t sid;
int64_t createdTime; int32_t vgId;
char superTableId[TSDB_TABLE_ID_LEN + 1]; char superTableId[TSDB_TABLE_ID_LEN + 1];
int8_t reserved[1]; int32_t sqlLen;
int8_t updateEnd[1]; int8_t reserved[1];
int8_t updateEnd[1];
int16_t nextColId; //used by normal table
char* sql; //used by normal table
SSchema* schema; //used by normal table
SSuperTableObj *superTable; SSuperTableObj *superTable;
} SChildTableObj; } SChildTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t sqlLen;
int8_t reserved[7];
int8_t updateEnd[1];
char* sql; //null-terminated string
int16_t nextColId;
SSchema* schema;
} SNormalTableObj;
struct _db_obj; struct _db_obj;
typedef struct _vg_obj { typedef struct _vg_obj {
...@@ -176,7 +154,7 @@ typedef struct _vg_obj { ...@@ -176,7 +154,7 @@ typedef struct _vg_obj {
struct _db_obj *pDb; struct _db_obj *pDb;
int32_t numOfTables; int32_t numOfTables;
void * idPool; void * idPool;
STableInfo ** tableList; SChildTableObj ** tableList;
} SVgObj; } SVgObj;
typedef struct _db_obj { typedef struct _db_obj {
...@@ -260,9 +238,10 @@ typedef struct { ...@@ -260,9 +238,10 @@ typedef struct {
typedef struct { typedef struct {
uint8_t msgType; uint8_t msgType;
int8_t expected; int8_t usePublicIp;
int8_t received; int8_t received;
int8_t successed; int8_t successed;
int8_t expected;
int32_t contLen; int32_t contLen;
int32_t code; int32_t code;
void *ahandle; void *ahandle;
......
...@@ -94,8 +94,8 @@ extern "C" { ...@@ -94,8 +94,8 @@ extern "C" {
#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 #define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62
#define TSDB_MSG_TYPE_CM_TABLE_META 63 #define TSDB_MSG_TYPE_CM_TABLE_META 63
#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 #define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64
#define TSDB_MSG_TYPE_CM_STABLE_META 65 #define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65
#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66 #define TSDB_MSG_TYPE_CM_STABLE_VGROUP_RSP 66
#define TSDB_MSG_TYPE_CM_TABLES_META 67 #define TSDB_MSG_TYPE_CM_TABLES_META 67
#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 #define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68
#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 #define TSDB_MSG_TYPE_CM_ALTER_STREAM 69
...@@ -738,8 +738,8 @@ typedef struct { ...@@ -738,8 +738,8 @@ typedef struct {
} SDMConfigTableMsg; } SDMConfigTableMsg;
typedef struct { typedef struct {
uint32_t dnode; uint32_t dnodeId;
int32_t vnode; int32_t vgId;
} SDMConfigVnodeMsg; } SDMConfigVnodeMsg;
typedef struct { typedef struct {
......
...@@ -23,23 +23,18 @@ extern "C" { ...@@ -23,23 +23,18 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include "taosdef.h" #include "taosdef.h"
#include "mnode.h" #include "mnode.h"
int32_t mgmtInitChildTables(); int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables(); void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId); void * mgmtGetChildTable(char *tableId);
void *mgmtCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); void mgmtCreateChildTable(SQueuedMsg *pMsg);
void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pCreate, SChildTableObj *pTable); void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable);
int32_t mgmtDropChildTable(SQueuedMsg *newMsg, SChildTableObj *pTable); void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,22 +22,23 @@ extern "C" { ...@@ -22,22 +22,23 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
void mgmtAddVgroupIntoDb(SVgObj *pVgroup); // api
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup);
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup);
void mgmtMoveVgroupToTail(SVgObj *pVgroup);
void mgmtMoveVgroupToHead(SVgObj *pVgroup);
int32_t mgmtInitDbs(); int32_t mgmtInitDbs();
void mgmtCleanUpDbs(); void mgmtCleanUpDbs();
SDbObj *mgmtGetDb(char *db); SDbObj *mgmtGetDb(char *db);
SDbObj *mgmtGetDbByTableId(char *db); SDbObj *mgmtGetDbByTableId(char *db);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb); bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
// util func
void mgmtAddSuperTableIntoDb(SDbObj *pDb); void mgmtAddSuperTableIntoDb(SDbObj *pDb);
void mgmtRemoveSuperTableFromDb(SDbObj *pDb); void mgmtRemoveSuperTableFromDb(SDbObj *pDb);
void mgmtAddTableIntoDb(SDbObj *pDb); void mgmtAddTableIntoDb(SDbObj *pDb);
void mgmtRemoveTableFromDb(SDbObj *pDb); void mgmtRemoveTableFromDb(SDbObj *pDb);
void mgmtAddVgroupIntoDb(SVgObj *pVgroup);
void mgmtAddVgroupIntoDbTail(SVgObj *pVgroup);
void mgmtRemoveVgroupFromDb(SVgObj *pVgroup);
void mgmtMoveVgroupToTail(SVgObj *pVgroup);
void mgmtMoveVgroupToHead(SVgObj *pVgroup);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,14 +21,14 @@ extern "C" { ...@@ -21,14 +21,14 @@ extern "C" {
#endif #endif
int32_t mgmtInitMnodes(); int32_t mgmtInitMnodes();
void mgmtCleanupMnodes(); void mgmtCleanupMnodes();
bool mgmtInServerStatus(); bool mgmtInServerStatus();
bool mgmtIsMaster(); bool mgmtIsMaster();
bool mgmtCheckRedirect(void *handle); bool mgmtCheckRedirect(void *handle);
void mgmtGetMnodePrivateIpList(SRpcIpSet *ipSet); void mgmtGetMnodePrivateIpList(SRpcIpSet *ipSet);
void mgmtGetMnodePublicIpList(SRpcIpSet *ipSet); void mgmtGetMnodePublicIpList(SRpcIpSet *ipSet);
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp); int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
int32_t mgmtRemoveMnode(uint32_t privateIp); int32_t mgmtRemoveMnode(uint32_t privateIp);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TBASE_MNODE_NORMAL_TABLE_H
#define TBASE_MNODE_NORMAL_TABLE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
void * mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
void * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void mgmtDropAllNormalTables(SDbObj *pDropDb);
#ifdef __cplusplus
}
#endif
#endif
...@@ -24,11 +24,11 @@ extern "C" { ...@@ -24,11 +24,11 @@ extern "C" {
int32_t mgmtInitProfile(); int32_t mgmtInitProfile();
void mgmtCleanUpProfile(); void mgmtCleanUpProfile();
bool mgmtCheckQhandle(uint64_t qhandle); bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle); void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle);
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -53,16 +53,16 @@ typedef struct { ...@@ -53,16 +53,16 @@ typedef struct {
int32_t (*destroyFp)(SSdbOperDesc *pDesc); int32_t (*destroyFp)(SSdbOperDesc *pDesc);
} SSdbTableDesc; } SSdbTableDesc;
void *sdbOpenTable(SSdbTableDesc *desc); void * sdbOpenTable(SSdbTableDesc *desc);
void sdbCloseTable(void *handle); void sdbCloseTable(void *handle);
int32_t sdbInsertRow(SSdbOperDesc *pOper); int32_t sdbInsertRow(SSdbOperDesc *pOper);
int32_t sdbDeleteRow(SSdbOperDesc *pOper); int32_t sdbDeleteRow(SSdbOperDesc *pOper);
int32_t sdbUpdateRow(SSdbOperDesc *pOper); int32_t sdbUpdateRow(SSdbOperDesc *pOper);
void *sdbGetRow(void *handle, void *key); void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pNode, void **ppRow); void *sdbFetchRow(void *handle, void *pNode, void **ppRow);
int64_t sdbGetNumOfRows(void *handle); int64_t sdbGetNumOfRows(void *handle);
uint64_t sdbGetVersion(); uint64_t sdbGetVersion();
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,8 +22,8 @@ extern "C" { ...@@ -22,8 +22,8 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
int32_t mgmtInitShell(); int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg)); void mgmtAddShellMsgHandle(uint8_t msgType, void (*fp)(SQueuedMsg *queuedMsg));
typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......
...@@ -22,31 +22,19 @@ extern "C" { ...@@ -22,31 +22,19 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
#include "taosdef.h" #include "taosdef.h"
#include "mnode.h" #include "mnode.h"
int32_t mgmtInitSuperTables(); int32_t mgmtInitSuperTables();
void mgmtCleanUpSuperTables(); void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId); void * mgmtGetSuperTable(char *tableId);
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate); void mgmtCreateSuperTable(SQueuedMsg *pMsg);
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pTable); void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable);
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName);
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable);
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb); void mgmtDropAllSuperTables(SDbObj *pDropDb);
int32_t mgmtExtractTableName(const char* tableId, char* name); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,13 +27,8 @@ extern "C" { ...@@ -27,13 +27,8 @@ extern "C" {
int32_t mgmtInitTables(); int32_t mgmtInitTables();
void mgmtCleanUpTables(); void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char *tableId); STableInfo* mgmtGetTable(char* tableId);
void mgmtExtractTableName(char* tableId, char* tableName);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMetaMsg *pMeta, bool usePublicIp);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
int32_t mgmtInitUsers(); int32_t mgmtInitUsers();
void mgmtCleanUpUsers(); void mgmtCleanUpUsers();
SUserObj *mgmtGetUser(char *name); SUserObj *mgmtGetUser(char *name);
SUserObj *mgmtGetUserFromConn(void *pConn); SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,17 +27,15 @@ extern "C" { ...@@ -27,17 +27,15 @@ extern "C" {
int32_t mgmtInitVgroups(); int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups(); void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode);
void mgmtDropAllVgroups(SDbObj *pDropDb); void mgmtDropAllVgroups(SDbObj *pDropDb);
void mgmtCreateVgroup(SQueuedMsg *pMsg); void mgmtCreateVgroup(SQueuedMsg *pMsg);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtUpdateVgroup(SVgObj *pVgroup);
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable); void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable); void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle); void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle); void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle);
......
此差异已折叠。
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#include "mgmtGrant.h" #include "mgmtGrant.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtChildTable.h" #include "mgmtChildTable.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtSuperTable.h" #include "mgmtSuperTable.h"
...@@ -42,7 +41,6 @@ static int32_t tsDbUpdateSize; ...@@ -42,7 +41,6 @@ static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static void mgmtDropDb(void *handle, void *tmrId); static void mgmtDropDb(void *handle, void *tmrId);
static int32_t mgmtSetDbDirty(SDbObj *pDb); static int32_t mgmtSetDbDirty(SDbObj *pDb);
static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg);
...@@ -82,7 +80,6 @@ static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) { ...@@ -82,7 +80,6 @@ static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) {
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
mgmtRemoveDbFromAcct(pAcct, pDb); mgmtRemoveDbFromAcct(pAcct, pDb);
mgmtDropAllNormalTables(pDb);
mgmtDropAllChildTables(pDb); mgmtDropAllChildTables(pDb);
mgmtDropAllSuperTables(pDb); mgmtDropAllSuperTables(pDb);
mgmtDropAllVgroups(pDb); mgmtDropAllVgroups(pDb);
...@@ -382,7 +379,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) ...@@ -382,7 +379,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
int32_t cols = 0; int32_t cols = 0;
SSchema *pSchema = pMeta->schema; SSchema *pSchema = pMeta->schema;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
pShow->bytes[cols] = TSDB_DB_NAME_LEN; pShow->bytes[cols] = TSDB_DB_NAME_LEN;
...@@ -532,7 +529,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -532,7 +529,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
char * pWrite; char * pWrite;
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
while (numOfRows < rows) { while (numOfRows < rows) {
......
...@@ -76,7 +76,7 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { ...@@ -76,7 +76,7 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
...@@ -169,7 +169,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -169,7 +169,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
...@@ -256,7 +256,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -256,7 +256,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......
...@@ -118,7 +118,7 @@ static void *mgmtGetNextMnode(SShowObj *pShow, SMnodeObj **pMnode) { ...@@ -118,7 +118,7 @@ static void *mgmtGetNextMnode(SShowObj *pShow, SMnodeObj **pMnode) {
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "ttime.h"
#include "tstatus.h"
#include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtSdb.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
void *tsNormalTableSdb;
int32_t tsNormalTableUpdateSize;
static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
tfree(pTable->schema);
tfree(pTable->sql);
tfree(pTable);
}
static int32_t mgmtNormalTableActionDestroy(SSdbOperDesc *pOper) {
mgmtDestroyNormalTable(pOper->pObj);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionInsert(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("ntable:%s not in vgroup:%d", pTable->tableId, pTable->vgId);
return TSDB_CODE_INVALID_VGROUP_ID;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("ntable:%s, account:%s not exists", pTable->tableId, pDb->cfg.acct);
return TSDB_CODE_INVALID_ACCT;
}
mgmtAddTimeSeries(pAcct, pTable->numOfColumns - 1);
mgmtAddTableIntoDb(pDb);
mgmtAddTableIntoVgroup(pVgroup, (STableInfo *) pTable);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionDelete(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
if (pTable->vgId == 0) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_VGROUP_ID;
}
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
if (pDb == NULL) {
mError("ntable:%s, vgroup:%d not in DB:%s", pTable->tableId, pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_INVALID_DB;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("account not exists");
return TSDB_CODE_INVALID_ACCT;
}
mgmtRestoreTimeSeries(pAcct, pTable->numOfColumns - 1);
mgmtRemoveTableFromDb(pDb);
mgmtRemoveTableFromVgroup(pVgroup, (STableInfo *) pTable);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionUpdate(SSdbOperDesc *pOper) {
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionEncode(SSdbOperDesc *pOper) {
SNormalTableObj *pTable = pOper->pObj;
assert(pOper->pObj != NULL && pOper->rowData != NULL);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (pOper->maxRowSize < tsNormalTableUpdateSize + schemaSize) {
return TSDB_CODE_INVALID_MSG_LEN;
}
memcpy(pOper->rowData, pTable, tsNormalTableUpdateSize);
memcpy(pOper->rowData + tsNormalTableUpdateSize, pTable->schema, schemaSize);
memcpy(pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sql, pTable->sqlLen);
pOper->rowSize = tsNormalTableUpdateSize + schemaSize + pTable->sqlLen;
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtNormalTableActionDecode(SSdbOperDesc *pOper) {
assert(pOper->rowData != NULL);
SNormalTableObj *pTable = (SNormalTableObj *)calloc(1, sizeof(SNormalTableObj));
if (pTable == NULL) TSDB_CODE_SERV_OUT_OF_MEMORY;
memcpy(pTable, pOper->rowData, tsNormalTableUpdateSize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *)malloc(schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyNormalTable(pTable);
return -1;
}
memcpy(pTable->schema, pOper->rowData + tsNormalTableUpdateSize, schemaSize);
pTable->sql = (char *)malloc(pTable->sqlLen);
if (pTable->sql == NULL) {
mgmtDestroyNormalTable(pTable);
return -1;
}
memcpy(pTable->sql, pOper->rowData + tsNormalTableUpdateSize + schemaSize, pTable->sqlLen);
pOper->pObj = pTable;
return TSDB_CODE_SUCCESS;
}
int32_t mgmtInitNormalTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SNormalTableObj *pTable = NULL;
SNormalTableObj tObj;
tsNormalTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableName = "ntables",
.hashSessions = TSDB_MAX_NORMAL_TABLES,
.maxRowSize = sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
.keyType = SDB_KEY_TYPE_STRING,
.insertFp = mgmtNormalTableActionInsert,
.deleteFp = mgmtNormalTableActionDelete,
.updateFp = mgmtNormalTableActionUpdate,
.encodeFp = mgmtNormalTableActionEncode,
.decodeFp = mgmtNormalTableActionDecode,
.destroyFp = mgmtNormalTableActionDestroy,
};
tsNormalTableSdb = sdbOpenTable(&tableDesc);
if (tsNormalTableSdb == NULL) {
mError("failed to init ntables data");
return -1;
}
while (1) {
pLastNode = pNode;
pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("ntable:%s, failed to get db, discard it", pTable->tableId);
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("ntable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->tableId, pTable->vgId, pTable->sid);
pTable->vgId = 0;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("ntable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
pTable->vgId = 0;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
if (pVgroup->tableList == NULL) {
mError("ntable:%s, vgroup:%d tableList is null", pTable->tableId, pTable->vgId);
pTable->vgId = 0;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_LOCAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
sdbDeleteRow(&desc);
pNode = pLastNode;
continue;
}
}
mTrace("ntables is initialized");
return 0;
}
void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb);
}
void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
int32_t totalCols = pTable->numOfColumns;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreate == NULL) {
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->type;
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = 0;
pCreate->sid = htonl(pTable->sid);
pCreate->sversion = htonl(pTable->sversion);
pCreate->tagDataLen = 0;
pCreate->sqlDataLen = htonl(pTable->sqlLen);
pCreate->uid = htobe64(pTable->uid);
pCreate->superTableUid = 0;
pCreate->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pCreate->data;
memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema));
for (int32_t col = 0; col < totalCols; ++col) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
pSchema++;
}
memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
return pCreate;
}
void *mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1);
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
strcpy(pTable->tableId, pCreate->tableId);
pTable->type = TSDB_NORMAL_TABLE;
pTable->vgId = pVgroup->vgId;
pTable->createdTime = taosGetTimestampMs();
pTable->uid = (((uint64_t) pTable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sid = sid;
pTable->sversion = 0;
pTable->numOfColumns = htons(pCreate->numOfColumns);
pTable->sqlLen = htons(pCreate->sqlLen);
int32_t numOfCols = pTable->numOfColumns;
int32_t schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pTable->nextColId = 0;
for (int32_t col = 0; col < numOfCols; col++) {
SSchema *tschema = pTable->schema;
tschema[col].colId = pTable->nextColId++;
tschema[col].bytes = htons(tschema[col].bytes);
}
if (pTable->sqlLen != 0) {
pTable->type = TSDB_STREAM_TABLE;
pTable->sql = calloc(1, pTable->sqlLen);
if (pTable->sql == NULL) {
free(pTable);
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
return NULL;
}
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pTable->sqlLen);
pTable->sql[pTable->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pTable->tableId, pTable->sqlLen, pTable->sql);
}
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
if (sdbInsertRow(&desc) < 0) {
mError("table:%s, update sdb error", pTable->tableId);
free(pTable);
terrno = TSDB_CODE_SDB_ERROR;
return NULL;
}
mTrace("table:%s, create ntable in vgroup, uid:%" PRIu64 , pTable->tableId, pTable->uid);
return pTable;
}
int32_t mgmtDropNormalTable(SQueuedMsg *newMsg, SNormalTableObj *pTable) {
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
mError("table:%s, failed to drop normal table, vgroup not exist", pTable->tableId);
return TSDB_CODE_OTHERS;
}
SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg));
if (pDrop == NULL) {
mError("table:%s, failed to drop normal table, no enough memory", pTable->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pDrop->tableId, pTable->tableId);
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
pDrop->vgId = htonl(pVgroup->vgId);
pDrop->sid = htonl(pTable->sid);
pDrop->uid = htobe64(pTable->uid);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mTrace("table:%s, send drop table msg", pDrop->tableId);
SRpcMsg rpcMsg = {
.handle = newMsg,
.pCont = pDrop,
.contLen = sizeof(SMDDropTableMsg),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
newMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
}
void* mgmtGetNormalTable(char *tableId) {
return sdbGetRow(tsNormalTableSdb, tableId);
}
static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) {
SSchema *schema = (SSchema *) pTable->schema;
for (int32_t i = 0; i < pTable->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) {
return i;
}
}
return -1;
}
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) {
return TSDB_CODE_APP_ERROR;
}
for (int32_t i = 0; i < ncols; i++) {
if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR;
}
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to andy account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols);
memcpy(pTable->schema + schemaSize, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns);
for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pTable->nextColId++;
}
pTable->numOfColumns += ncols;
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries += ncols;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
desc.rowData = pTable;
desc.rowSize = tsNormalTableUpdateSize;
sdbUpdateRow(&desc);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) {
int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName);
if (col < 0) {
return TSDB_CODE_APP_ERROR;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("table: %s not belongs to any database", pTable->tableId);
return TSDB_CODE_APP_ERROR;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct == NULL) {
mError("DB: %s not belongs to any account", pDb->name);
return TSDB_CODE_APP_ERROR;
}
memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pTable->numOfColumns - col - 1));
pTable->numOfColumns--;
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries--;
SSdbOperDesc desc = {0};
desc.type = SDB_OPER_TYPE_GLOBAL;
desc.pObj = pTable;
desc.table = tsNormalTableSdb;
desc.rowData = pTable;
desc.rowSize = tsNormalTableUpdateSize;
sdbUpdateRow(&desc);
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns;
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name);
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
}
return numOfCols * sizeof(SSchema);
}
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgId = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = 0;
pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
strncpy(pMeta->tableId, pTable->tableId, tListLen(pTable->tableId));
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
return TSDB_CODE_SUCCESS;
}
void mgmtDropAllNormalTables(SDbObj *pDropDb) {
void *pNode = NULL;
void *pLastNode = NULL;
int32_t numOfTables = 0;
int32_t dbNameLen = strlen(pDropDb->name);
SNormalTableObj *pTable = NULL;
while (1) {
pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) {
SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL,
.table = tsNormalTableSdb,
.pObj = pTable,
};
sdbDeleteRow(&oper);
pNode = pLastNode;
numOfTables++;
continue;
}
}
mTrace("db:%s, all normal tables:%d is dropped from sdb", pDropDb->name, numOfTables);
}
...@@ -678,7 +678,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { ...@@ -678,7 +678,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(pMsg->thandle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
...@@ -702,7 +702,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { ...@@ -702,7 +702,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(pMsg->thandle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
...@@ -726,7 +726,7 @@ void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { ...@@ -726,7 +726,7 @@ void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(pMsg->thandle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
if (pUser == NULL) { if (pUser == NULL) {
rpcRsp.code = TSDB_CODE_INVALID_USER; rpcRsp.code = TSDB_CODE_INVALID_USER;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtGrant.h" #include "mgmtGrant.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -138,13 +137,19 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -138,13 +137,19 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
if (mgmtCheckExpired()) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED);
return;
}
if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) { if (tsMgmtProcessShellMsgFp[rpcMsg->msgType] == NULL) {
mgmtProcessUnSupportMsg(rpcMsg); mgmtProcessUnSupportMsg(rpcMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); bool usePublicIp = false;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
if (pUser == NULL) { if (pUser == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -158,6 +163,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -158,6 +163,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
queuedMsg.contLen = rpcMsg->contLen; queuedMsg.contLen = rpcMsg->contLen;
queuedMsg.pCont = rpcMsg->pCont; queuedMsg.pCont = rpcMsg->pCont;
queuedMsg.pUser = pUser; queuedMsg.pUser = pUser;
queuedMsg.usePublicIp = usePublicIp;
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg); (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
} else { } else {
...@@ -167,6 +173,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -167,6 +173,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
queuedMsg->contLen = rpcMsg->contLen; queuedMsg->contLen = rpcMsg->contLen;
queuedMsg->pCont = rpcMsg->pCont; queuedMsg->pCont = rpcMsg->pCont;
queuedMsg->pUser = pUser; queuedMsg->pUser = pUser;
queuedMsg->usePublicIp = usePublicIp;
mgmtAddToShellQueue(queuedMsg); mgmtAddToShellQueue(queuedMsg);
} }
} }
...@@ -440,7 +447,7 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) { ...@@ -440,7 +447,7 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) { static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
if ((type == TSDB_MSG_TYPE_CM_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) || if ((type == TSDB_MSG_TYPE_CM_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) ||
type == TSDB_MSG_TYPE_CM_STABLE_META || type == TSDB_MSG_TYPE_RETRIEVE || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_SHOW || type == TSDB_MSG_TYPE_CM_TABLES_META ||
type == TSDB_MSG_TYPE_CM_CONNECT) { type == TSDB_MSG_TYPE_CM_CONNECT) {
return true; return true;
......
...@@ -15,10 +15,12 @@ ...@@ -15,10 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "name.h"
#include "tsqlfunction.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "mgmtChildTable.h" #include "mgmtChildTable.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtGrant.h" #include "mgmtGrant.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -27,14 +29,11 @@ ...@@ -27,14 +29,11 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#include "mnode.h"
#include "name.h"
#include "tsqlfunction.h"
static void *tsSuperTableSdb; static void *tsSuperTableSdb;
static int32_t tsSuperTableUpdateSize; static int32_t tsSuperTableUpdateSize;
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg);
static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg);
static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
...@@ -49,8 +48,8 @@ static int32_t mgmtSuperTableActionDestroy(SSdbOperDesc *pOper) { ...@@ -49,8 +48,8 @@ static int32_t mgmtSuperTableActionDestroy(SSdbOperDesc *pOper) {
} }
static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
STableInfo *pStable = pOper->pObj; SSuperTableObj *pStable = pOper->pObj;
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb != NULL) { if (pDb != NULL) {
mgmtAddSuperTableIntoDb(pDb); mgmtAddSuperTableIntoDb(pDb);
} }
...@@ -58,8 +57,8 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { ...@@ -58,8 +57,8 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
} }
static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
STableInfo *pStable = pOper->pObj; SSuperTableObj *pStable = pOper->pObj;
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb != NULL) { if (pDb != NULL) {
mgmtRemoveSuperTableFromDb(pDb); mgmtRemoveSuperTableFromDb(pDb);
mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable); mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable);
...@@ -132,8 +131,10 @@ int32_t mgmtInitSuperTables() { ...@@ -132,8 +131,10 @@ int32_t mgmtInitSuperTables() {
return -1; return -1;
} }
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mgmtGetShowSuperTableMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mgmtRetrieveShowSuperTables);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropStableRsp);
mTrace("stables is initialized"); mTrace("stables is initialized");
return 0; return 0;
...@@ -143,17 +144,17 @@ void mgmtCleanUpSuperTables() { ...@@ -143,17 +144,17 @@ void mgmtCleanUpSuperTables() {
sdbCloseTable(tsSuperTableSdb); sdbCloseTable(tsSuperTableSdb);
} }
int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { void mgmtCreateSuperTable(SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj)); SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj));
if (pStable == NULL) { if (pStable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
return;
} }
strcpy(pStable->tableId, pCreate->tableId); strcpy(pStable->info.tableId, pCreate->tableId);
pStable->type = TSDB_SUPER_TABLE; pStable->info.type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs(); pStable->createdTime = taosGetTimestampMs();
pStable->vgId = 0;
pStable->sid = 0;
pStable->uid = (((uint64_t) pStable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul)); pStable->uid = (((uint64_t) pStable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul));
pStable->sversion = 0; pStable->sversion = 0;
pStable->numOfColumns = htons(pCreate->numOfColumns); pStable->numOfColumns = htons(pCreate->numOfColumns);
...@@ -165,7 +166,8 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { ...@@ -165,7 +166,8 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
if (pStable->schema == NULL) { if (pStable->schema == NULL) {
free(pStable); free(pStable);
mError("stable:%s, no schema input", pCreate->tableId); mError("stable:%s, no schema input", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
} }
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
...@@ -186,17 +188,17 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) { ...@@ -186,17 +188,17 @@ int32_t mgmtCreateSuperTable(SCMCreateTableMsg *pCreate) {
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mgmtDestroySuperTable(pStable); mgmtDestroySuperTable(pStable);
return TSDB_CODE_SDB_ERROR; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SDB_ERROR);
} else { } else {
mLPrint("stable:%s, is created, tags:%d cols:%d", pStable->tableId, pStable->numOfTags, pStable->numOfColumns); mLPrint("stable:%s, is created, tags:%d cols:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns);
return TSDB_CODE_SUCCESS; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
} }
} }
int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pStable) { void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pStable) {
if (pStable->numOfTables != 0) { if (pStable->numOfTables != 0) {
mError("stable:%s, numOfTables:%d not 0", pStable->tableId, pStable->numOfTables); mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables);
return TSDB_CODE_OTHERS; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
} else { } else {
SSdbOperDesc oper = { SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_GLOBAL, .type = SDB_OPER_TYPE_GLOBAL,
...@@ -204,8 +206,8 @@ int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pSta ...@@ -204,8 +206,8 @@ int32_t mgmtDropSuperTable(SQueuedMsg *newMsg, SDbObj *pDb, SSuperTableObj *pSta
.pObj = pStable .pObj = pStable
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->tableId, tstrerror(code)); mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code));
return code; mgmtSendSimpleResp(pMsg->thandle, code);
} }
} }
...@@ -213,14 +215,14 @@ void* mgmtGetSuperTable(char *tableId) { ...@@ -213,14 +215,14 @@ void* mgmtGetSuperTable(char *tableId) {
return sdbGetRow(tsSuperTableSdb, tableId); return sdbGetRow(tsSuperTableSdb, tableId);
} }
void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { static void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) {
SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum());
rsp->numOfDnodes = htonl(1); rsp->numOfDnodes = htonl(1);
rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp));
return rsp; return rsp;
} }
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { static int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) {
for (int32_t i = 0; i < pStable->numOfTags; i++) { for (int32_t i = 0; i < pStable->numOfTags; i++) {
SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema)); SSchema *schema = (SSchema *)(pStable->schema + (pStable->numOfColumns + i) * sizeof(SSchema));
if (strcasecmp(tagName, schema->name) == 0) { if (strcasecmp(tagName, schema->name) == 0) {
...@@ -231,7 +233,7 @@ int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) ...@@ -231,7 +233,7 @@ int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName)
return -1; return -1;
} }
int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) { static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ntags) {
if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) { if (pStable->numOfTags + ntags > TSDB_MAX_TAGS) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -245,9 +247,9 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ...@@ -245,9 +247,9 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t
} }
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("meter: %s not belongs to any database", pStable->tableId); mError("meter: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -275,19 +277,19 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t ...@@ -275,19 +277,19 @@ int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], int32_t
pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables); pAcct->acctInfo.numOfTimeSeries += (ntags * pStable->numOfTables);
// sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL); // sdbUpdateRow(tsSuperTableSdb, pStable, tsSuperTableUpdateSize, SDB_OPER_GLOBAL);
mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->tableId); mTrace("Succeed to add tag column %s to table %s", schema[0].name, pStable->info.tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName); int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName);
if (col <= 0 || col >= pStable->numOfTags) { if (col <= 0 || col >= pStable->numOfTags) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("table: %s not belongs to any database", pStable->tableId); mError("table: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -311,15 +313,15 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { ...@@ -311,15 +313,15 @@ int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) { static int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagName, char *newTagName) {
int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName); int32_t col = mgmtFindSuperTableTagIndex(pStable, oldTagName);
if (col < 0) { if (col < 0) {
// Tag name does not exist // Tag name does not exist
mError("Failed to modify table %s tag column, oname: %s, nname: %s", pStable->tableId, oldTagName, newTagName); mError("Failed to modify table %s tag column, oname: %s, nname: %s", pStable->info.tableId, oldTagName, newTagName);
return TSDB_CODE_INVALID_MSG_TYPE; return TSDB_CODE_INVALID_MSG_TYPE;
} }
int32_t rowSize = 0; // int32_t rowSize = 0;
uint32_t len = strlen(newTagName); uint32_t len = strlen(newTagName);
if (col >= pStable->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindSuperTableTagIndex(pStable, newTagName) >= 0) { if (col >= pStable->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindSuperTableTagIndex(pStable, newTagName) >= 0) {
...@@ -343,11 +345,11 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN ...@@ -343,11 +345,11 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN
tfree(msg); tfree(msg);
if (ret < 0) { if (ret < 0) {
mError("Failed to modify table %s tag column", pStable->tableId); mError("Failed to modify table %s tag column", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
mTrace("Succeed to modify table %s tag column", pStable->tableId); mTrace("Succeed to modify table %s tag column", pStable->info.tableId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -362,7 +364,7 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN ...@@ -362,7 +364,7 @@ static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colN
return -1; return -1;
} }
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) { static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) { if (ncols <= 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -373,9 +375,9 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32 ...@@ -373,9 +375,9 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32
} }
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("meter: %s not belongs to any database", pStable->tableId); mError("meter: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -406,15 +408,15 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32 ...@@ -406,15 +408,15 @@ int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[], int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { static int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName); int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName);
if (col < 0) { if (col < 0) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
SDbObj *pDb = mgmtGetDbByTableId(pStable->tableId); SDbObj *pDb = mgmtGetDbByTableId(pStable->info.tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("table: %s not belongs to any database", pStable->tableId); mError("table: %s not belongs to any database", pStable->info.tableId);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
...@@ -501,7 +503,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -501,7 +503,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
SDbObj *pDb = mgmtGetDb(pShow->db); SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) { if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) {
...@@ -519,12 +521,12 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -519,12 +521,12 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable); pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
if (strncmp(pTable->tableId, prefix, prefixLen)) { if (strncmp(pTable->info.tableId, prefix, prefixLen)) {
continue; continue;
} }
memset(stableName, 0, tListLen(stableName)); memset(stableName, 0, tListLen(stableName));
mgmtExtractTableName(pTable->tableId, stableName); mgmtExtractTableName(pTable->info.tableId, stableName);
if (pShow->payloadLen > 0 && if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
...@@ -572,7 +574,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { ...@@ -572,7 +574,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
break; break;
} }
if (strncmp(pDropDb->name, pTable->tableId, dbNameLen) == 0) { if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) {
SSdbOperDesc oper = { SSdbOperDesc oper = {
.type = SDB_OPER_TYPE_LOCAL, .type = SDB_OPER_TYPE_LOCAL,
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
...@@ -588,14 +590,6 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { ...@@ -588,14 +590,6 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables); mTrace("db:%s, all super tables:%d is dropped from sdb", pDropDb->name, numOfTables);
} }
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) {
pStable->numOfTables++;
}
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable) {
pStable->numOfTables--;
}
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -609,32 +603,70 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { ...@@ -609,32 +603,70 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
} }
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable) {
SDbObj *pDb = pMsg->pDb;
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
pMeta->uid = htobe64(pTable->uid); pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgId = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion); pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision; pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = pTable->numOfTags; pMeta->numOfTags = (uint8_t)pTable->numOfTags;
pMeta->numOfColumns = htons(pTable->numOfColumns); pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
pMeta->tableType = pTable->type; pMeta->tableType = pTable->info.type;
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable);
strcpy(pMeta->tableId, pTable->tableId); strcpy(pMeta->tableId, pTable->info.tableId);
return TSDB_CODE_SUCCESS; SRpcMsg rpcRsp = {
.handle = pMsg->thandle,
.pCont = pMeta,
.contLen = pMeta->contLen,
};
pMeta->contLen = htons(pMeta->contLen);
rpcSendResponse(&rpcRsp);
mTrace("stable:%%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid);
} }
int32_t mgmtExtractTableName(const char* tableId, char* name) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
int pos = -1; SCMSuperTableInfoMsg *pInfo = pMsg->pCont;
int num = 0; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId);
for (pos = 0; tableId[pos] != 0; ++pos) { if (pTable == NULL) {
if (tableId[pos] == '.') num++; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
if (num == 2) break; return;
}
SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable);
if (pRsp != NULL) {
int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t);
SRpcMsg rpcRsp = {0};
rpcRsp.handle = pMsg->thandle;
rpcRsp.pCont = pRsp;
rpcRsp.contLen = msgLen;
rpcSendResponse(&rpcRsp);
} else {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
} }
}
if (num == 2) { void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) {
strcpy(name, tableId + pos + 1); int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
} SCMAlterTableMsg *pAlter = pMsg->pCont;
return 0;
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
code = mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
code = mgmtDropSuperTableTag((SSuperTableObj *) pTable, pAlter->schema[0].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
code = mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
code = mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1);
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mgmtDropSuperTableColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name);
} else {}
mgmtSendSimpleResp(pMsg->thandle, code);
} }
static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) {
mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
}
\ No newline at end of file
此差异已折叠。
...@@ -25,14 +25,14 @@ ...@@ -25,14 +25,14 @@
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtUser.h" #include "mgmtUser.h"
void *tsUserSdb = NULL; static void *tsUserSdb = NULL;
static int32_t tsUserUpdateSize = 0; static int32_t tsUserUpdateSize = 0;
static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass); static int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
static int32_t mgmtDropUser(SAcctObj *pAcct, char *name); static int32_t mgmtDropUser(SAcctObj *pAcct, char *name);
static int32_t mgmtUpdateUser(SUserObj *pUser); static int32_t mgmtUpdateUser(SUserObj *pUser);
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg);
...@@ -231,7 +231,7 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) { ...@@ -231,7 +231,7 @@ static int32_t mgmtDropUser(SAcctObj *pAcct, char *name) {
} }
static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mgmtGetUserFromConn(pConn); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) { if (pUser == NULL) {
return TSDB_CODE_INVALID_USER; return TSDB_CODE_INVALID_USER;
} }
...@@ -310,9 +310,12 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void ...@@ -310,9 +310,12 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
return numOfRows; return numOfRows;
} }
SUserObj *mgmtGetUserFromConn(void *pConn) { SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) {
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
if (rpcGetConnInfo(pConn, &connInfo) == 0) { if (rpcGetConnInfo(pConn, &connInfo) == 0) {
if (usePublicIp) {
*usePublicIp = (connInfo.serverIp == tsPublicIpInt);
}
return mgmtGetUser(connInfo.user); return mgmtGetUser(connInfo.user);
} }
......
...@@ -20,10 +20,12 @@ ...@@ -20,10 +20,12 @@
#include "tstatus.h" #include "tstatus.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -66,8 +68,8 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { ...@@ -66,8 +68,8 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
pVgroup->prev = NULL; pVgroup->prev = NULL;
pVgroup->next = NULL; pVgroup->next = NULL;
int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxSessions;
pVgroup->tableList = (STableInfo **)calloc(pDb->cfg.maxSessions, sizeof(STableInfo *)); pVgroup->tableList = calloc(pDb->cfg.maxSessions, sizeof(SChildTableObj *));
if (pVgroup->tableList == NULL) { if (pVgroup->tableList == NULL) {
mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size); mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
return -1; return -1;
...@@ -111,8 +113,8 @@ static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) { ...@@ -111,8 +113,8 @@ static int32_t mgmtVgroupActionUpdate(SSdbOperDesc *pOper) {
if (pDb->cfg.maxSessions != oldTables) { if (pDb->cfg.maxSessions != oldTables) {
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions); mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxSessions);
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions); taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxSessions);
int32_t size = sizeof(STableInfo *) * pDb->cfg.maxSessions; int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxSessions;
pVgroup->tableList = (STableInfo **)realloc(pVgroup->tableList, size); pVgroup->tableList = (SChildTableObj **)realloc(pVgroup->tableList, size);
} }
} }
...@@ -276,9 +278,9 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { ...@@ -276,9 +278,9 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t maxReplica = 0; int32_t maxReplica = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
STableInfo *pTable = NULL; SChildTableObj *pTable = NULL;
if (pShow->payloadLen > 0 ) { if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload); pTable = mgmtGetChildTable(pShow->payload);
if (NULL == pTable) { if (NULL == pTable) {
return TSDB_CODE_INVALID_TABLE_ID; return TSDB_CODE_INVALID_TABLE_ID;
} }
...@@ -428,7 +430,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -428,7 +430,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
return numOfRows; return numOfRows;
} }
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) { if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) {
pVgroup->tableList[pTable->sid] = pTable; pVgroup->tableList[pTable->sid] = pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
...@@ -439,7 +441,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) { ...@@ -439,7 +441,7 @@ void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable) {
mgmtAddVgroupIntoDbTail(pVgroup); mgmtAddVgroupIntoDbTail(pVgroup);
} }
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable) { void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) { if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) {
pVgroup->tableList[pTable->sid] = NULL; pVgroup->tableList[pTable->sid] = NULL;
taosFreeId(pVgroup->idPool, pTable->sid); taosFreeId(pVgroup->idPool, pTable->sid);
...@@ -482,20 +484,6 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { ...@@ -482,20 +484,6 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
return pVnode; return pVnode;
} }
SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
if (vnode < 0 || vnode >= TSDB_MAX_VNODES) {
return NULL;
}
SDnodeObj *pDnode = mgmtGetDnode(dnode);
if (pDnode == NULL) {
return NULL;
}
int32_t vgId = pDnode->vload[vnode].vgId;
return mgmtGetVgroup(vgId);
}
SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
SRpcIpSet ipSet = { SRpcIpSet ipSet = {
.numOfIps = pVgroup->numOfVnodes, .numOfIps = pVgroup->numOfVnodes,
...@@ -659,19 +647,26 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { ...@@ -659,19 +647,26 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
if (mgmtCheckRedirect(rpcMsg->handle)) return; if (mgmtCheckRedirect(rpcMsg->handle)) return;
SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont; SDMConfigVnodeMsg *pCfg = (SDMConfigVnodeMsg *) rpcMsg->pCont;
pCfg->dnode = htonl(pCfg->dnode); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->vnode = htonl(pCfg->vnode); pCfg->vgId = htonl(pCfg->vgId);
SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnodeId);
if (pDnode == NULL) {
mTrace("dnode:%s, invalid dnode", taosIpStr(pCfg->dnodeId), pCfg->vgId);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return;
}
SVgObj *pVgroup = mgmtGetVgroupByVnode(pCfg->dnode, pCfg->vnode); SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mTrace("dnode:%s, vnode:%d, no vgroup info", taosIpStr(pCfg->dnode), pCfg->vnode); mTrace("dnode:%s, vgId:%d, no vgroup info", taosIpStr(pCfg->dnodeId), pCfg->vgId);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
return; return;
} }
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL); mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL);
} }
......
...@@ -363,7 +363,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) { ...@@ -363,7 +363,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
// for TDengine, all the query, show commands shall have TCP connection // for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType; char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_CM_STABLE_META || type == TSDB_MSG_TYPE_CM_TABLES_META || type == TSDB_MSG_TYPE_CM_STABLE_VGROUP || type == TSDB_MSG_TYPE_CM_TABLES_META ||
type == TSDB_MSG_TYPE_CM_SHOW ) type == TSDB_MSG_TYPE_CM_SHOW )
pContext->connType = RPC_CONN_TCPC; pContext->connType = RPC_CONN_TCPC;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册