Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
e4e86609
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e4e86609
编写于
11月 06, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1915
上级
4bc9284a
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
141 addition
and
139 deletion
+141
-139
src/dnode/src/dnodeMPeer.c
src/dnode/src/dnodeMPeer.c
+1
-1
src/dnode/src/dnodeMRead.c
src/dnode/src/dnodeMRead.c
+1
-1
src/dnode/src/dnodeMWrite.c
src/dnode/src/dnodeMWrite.c
+1
-1
src/dnode/src/dnodePeer.c
src/dnode/src/dnodePeer.c
+1
-0
src/inc/mnode.h
src/inc/mnode.h
+8
-8
src/inc/taosdef.h
src/inc/taosdef.h
+29
-28
src/mnode/inc/mnodeDef.h
src/mnode/inc/mnodeDef.h
+4
-4
src/mnode/inc/mnodeTable.h
src/mnode/inc/mnodeTable.h
+2
-2
src/mnode/inc/mnodeVgroup.h
src/mnode/inc/mnodeVgroup.h
+2
-2
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+1
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+87
-87
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+3
-3
src/plugins/http/src/httpQueue.c
src/plugins/http/src/httpQueue.c
+1
-1
未找到文件。
src/dnode/src/dnodeMPeer.c
浏览文件 @
e4e86609
...
...
@@ -128,7 +128,7 @@ void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) {
return
;
}
SMnodeMsg
*
pPeer
=
(
SMnodeMsg
*
)
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
SMnodeMsg
*
pPeer
=
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
mnodeCreateMsg
(
pPeer
,
pMsg
);
taosWriteQitem
(
tsMPeerQueue
,
TAOS_QTYPE_RPC
,
pPeer
);
}
...
...
src/dnode/src/dnodeMRead.c
浏览文件 @
e4e86609
...
...
@@ -129,7 +129,7 @@ void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
return
;
}
SMnodeMsg
*
pRead
=
(
SMnodeMsg
*
)
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
SMnodeMsg
*
pRead
=
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
mnodeCreateMsg
(
pRead
,
pMsg
);
taosWriteQitem
(
tsMReadQueue
,
TAOS_QTYPE_RPC
,
pRead
);
}
...
...
src/dnode/src/dnodeMWrite.c
浏览文件 @
e4e86609
...
...
@@ -129,7 +129,7 @@ void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
return
;
}
SMnodeMsg
*
pWrite
=
(
SMnodeMsg
*
)
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
SMnodeMsg
*
pWrite
=
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
mnodeCreateMsg
(
pWrite
,
pMsg
);
dDebug
(
"app:%p:%p, msg:%s is put into mwrite queue:%p"
,
pWrite
->
rpcMsg
.
ahandle
,
pWrite
,
...
...
src/dnode/src/dnodePeer.c
浏览文件 @
e4e86609
...
...
@@ -19,6 +19,7 @@
* to dnode. All theses messages are handled from here
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
...
...
src/inc/mnode.h
浏览文件 @
e4e86609
...
...
@@ -35,7 +35,13 @@ typedef struct {
}
SMnodeRsp
;
typedef
struct
SMnodeMsg
{
SRpcMsg
rpcMsg
;
struct
SAcctObj
*
pAcct
;
struct
SDnodeObj
*
pDnode
;
struct
SUserObj
*
pUser
;
struct
SDbObj
*
pDb
;
struct
SVgObj
*
pVgroup
;
struct
STableObj
*
pTable
;
struct
SSTableObj
*
pSTable
;
SMnodeRsp
rpcRsp
;
int8_t
received
;
int8_t
successed
;
...
...
@@ -43,13 +49,7 @@ typedef struct SMnodeMsg {
int8_t
retry
;
int32_t
code
;
void
*
pObj
;
struct
SAcctObj
*
pAcct
;
struct
SDnodeObj
*
pDnode
;
struct
SUserObj
*
pUser
;
struct
SDbObj
*
pDb
;
struct
SVgObj
*
pVgroup
;
struct
STableObj
*
pTable
;
struct
SSuperTableObj
*
pSTable
;
SRpcMsg
rpcMsg
;
}
SMnodeMsg
;
void
mnodeCreateMsg
(
SMnodeMsg
*
pMsg
,
SRpcMsg
*
rpcMsg
);
...
...
src/inc/taosdef.h
浏览文件 @
e4e86609
...
...
@@ -424,42 +424,43 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TSDB_PORT_HTTP 11
#define TSDB_PORT_ARBITRATOR 12
#define TSDB_PORT_ARBITRATOR 12
#define TAOS_QTYPE_RPC 0
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
#define TAOS_QTYPE_CQ 3
#define TAOS_QTYPE_QUERY 4
typedef
enum
{
TAOS_QTYPE_RPC
=
0
,
TAOS_QTYPE_FWD
=
1
,
TAOS_QTYPE_WAL
=
2
,
TAOS_QTYPE_CQ
=
3
,
TAOS_QTYPE_QUERY
=
4
}
EQType
;
typedef
enum
{
TSDB_SUPER_TABLE
=
0
,
// super table
TSDB_CHILD_TABLE
=
1
,
// table created from super table
TSDB_NORMAL_TABLE
=
2
,
// ordinary table
TSDB_STREAM_TABLE
=
3
,
// table created from stream computing
TSDB_TABLE_MAX
=
4
TSDB_SUPER_TABLE
=
0
,
// super table
TSDB_CHILD_TABLE
=
1
,
// table created from super table
TSDB_NORMAL_TABLE
=
2
,
// ordinary table
TSDB_STREAM_TABLE
=
3
,
// table created from stream computing
TSDB_TABLE_MAX
=
4
}
ETableType
;
typedef
enum
{
TSDB_MOD_MNODE
,
TSDB_MOD_HTTP
,
TSDB_MOD_MONITOR
,
TSDB_MOD_MQTT
,
TSDB_MOD_MAX
TSDB_MOD_MNODE
=
0
,
TSDB_MOD_HTTP
=
1
,
TSDB_MOD_MONITOR
=
2
,
TSDB_MOD_MQTT
=
3
,
TSDB_MOD_MAX
=
4
}
EModuleType
;
typedef
enum
{
TSDB_CHECK_ITEM_NETWORK
,
TSDB_CHECK_ITEM_MEM
,
TSDB_CHECK_ITEM_CPU
,
TSDB_CHECK_ITEM_DISK
,
TSDB_CHECK_ITEM_OS
,
TSDB_CHECK_ITEM_ACCESS
,
TSDB_CHECK_ITEM_VERSION
,
TSDB_CHECK_ITEM_DATAFILE
,
TSDB_CHECK_ITEM_MAX
}
ECheckItemType
;
typedef
enum
{
TSDB_CHECK_ITEM_NETWORK
,
TSDB_CHECK_ITEM_MEM
,
TSDB_CHECK_ITEM_CPU
,
TSDB_CHECK_ITEM_DISK
,
TSDB_CHECK_ITEM_OS
,
TSDB_CHECK_ITEM_ACCESS
,
TSDB_CHECK_ITEM_VERSION
,
TSDB_CHECK_ITEM_DATAFILE
,
TSDB_CHECK_ITEM_MAX
}
ECheckItemType
;
#ifdef __cplusplus
}
...
...
src/mnode/inc/mnodeDef.h
浏览文件 @
e4e86609
...
...
@@ -89,7 +89,7 @@ typedef struct STableObj {
int8_t
type
;
}
STableObj
;
typedef
struct
SS
uper
TableObj
{
typedef
struct
SSTableObj
{
STableObj
info
;
int8_t
reserved0
[
9
];
// for fill struct STableObj to 4byte align
int16_t
nextColId
;
...
...
@@ -104,7 +104,7 @@ typedef struct SSuperTableObj {
int32_t
numOfTables
;
SSchema
*
schema
;
void
*
vgHash
;
}
SS
uper
TableObj
;
}
SSTableObj
;
typedef
struct
{
STableObj
info
;
...
...
@@ -122,8 +122,8 @@ typedef struct {
int32_t
refCount
;
char
*
sql
;
//used by normal table
SSchema
*
schema
;
//used by normal table
SS
uperTableObj
*
superTable
;
}
SC
hild
TableObj
;
SS
TableObj
*
superTable
;
}
SCTableObj
;
typedef
struct
{
int32_t
dnodeId
;
...
...
src/mnode/inc/mnodeTable.h
浏览文件 @
e4e86609
...
...
@@ -29,8 +29,8 @@ int64_t mnodeGetChildTableNum();
void
*
mnodeGetTable
(
char
*
tableId
);
void
mnodeIncTableRef
(
void
*
pTable
);
void
mnodeDecTableRef
(
void
*
pTable
);
void
*
mnodeGetNextChildTable
(
void
*
pIter
,
SC
hild
TableObj
**
pTable
);
void
*
mnodeGetNextSuperTable
(
void
*
pIter
,
SS
uper
TableObj
**
pTable
);
void
*
mnodeGetNextChildTable
(
void
*
pIter
,
SCTableObj
**
pTable
);
void
*
mnodeGetNextSuperTable
(
void
*
pIter
,
SSTableObj
**
pTable
);
void
mnodeDropAllChildTables
(
SDbObj
*
pDropDb
);
void
mnodeDropAllSuperTables
(
SDbObj
*
pDropDb
);
void
mnodeDropAllChildTablesInVgroups
(
SVgObj
*
pVgroup
);
...
...
src/mnode/inc/mnodeVgroup.h
浏览文件 @
e4e86609
...
...
@@ -43,8 +43,8 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle);
void
mnodeAlterVgroup
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
int32_t
mnodeGetAvailableVgroup
(
struct
SMnodeMsg
*
pMsg
,
SVgObj
**
pVgroup
,
int32_t
*
sid
);
void
mnodeAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
SC
hild
TableObj
*
pTable
);
void
mnodeRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
SC
hild
TableObj
*
pTable
);
void
mnodeAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
SCTableObj
*
pTable
);
void
mnodeRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
SCTableObj
*
pTable
);
void
mnodeSendDropVnodeMsg
(
int32_t
vgId
,
SRpcEpSet
*
epSet
,
void
*
ahandle
);
void
mnodeSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
void
mnodeSendAlterVgroupMsg
(
SVgObj
*
pVgroup
);
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
e4e86609
...
...
@@ -1043,7 +1043,7 @@ void sdbFreeWritequeue() {
int32_t
sdbWriteToQueue
(
void
*
param
,
void
*
data
,
int32_t
qtype
,
void
*
pMsg
)
{
SWalHead
*
pHead
=
data
;
int32_t
size
=
sizeof
(
SWalHead
)
+
pHead
->
len
;
SWalHead
*
pWal
=
(
SWalHead
*
)
taosAllocateQitem
(
size
);
SWalHead
*
pWal
=
taosAllocateQitem
(
size
);
memcpy
(
pWal
,
pHead
,
size
);
taosWriteQitem
(
tsSdbWriteQueue
,
qtype
,
pWal
);
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
e4e86609
...
...
@@ -52,9 +52,9 @@ static int32_t tsSuperTableUpdateSize;
static
void
*
mnodeGetChildTable
(
char
*
tableId
);
static
void
*
mnodeGetSuperTable
(
char
*
tableId
);
static
void
*
mnodeGetSuperTableByUid
(
uint64_t
uid
);
static
void
mnodeDropAllChildTablesInStable
(
SS
uper
TableObj
*
pStable
);
static
void
mnodeAddTableIntoStable
(
SS
uperTableObj
*
pStable
,
SChild
TableObj
*
pCtable
);
static
void
mnodeRemoveTableFromStable
(
SS
uperTableObj
*
pStable
,
SChild
TableObj
*
pCtable
);
static
void
mnodeDropAllChildTablesInStable
(
SSTableObj
*
pStable
);
static
void
mnodeAddTableIntoStable
(
SS
TableObj
*
pStable
,
SC
TableObj
*
pCtable
);
static
void
mnodeRemoveTableFromStable
(
SS
TableObj
*
pStable
,
SC
TableObj
*
pCtable
);
static
int32_t
mnodeGetShowTableMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mnodeRetrieveShowTables
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
...
...
@@ -86,9 +86,9 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg);
static
int32_t
mnodeProcessAlterTableMsg
(
SMnodeMsg
*
pMsg
);
static
void
mnodeProcessAlterTableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mnodeFindSuperTableColumnIndex
(
SS
uper
TableObj
*
pStable
,
char
*
colName
);
static
int32_t
mnodeFindSuperTableColumnIndex
(
SSTableObj
*
pStable
,
char
*
colName
);
static
void
mnodeDestroyChildTable
(
SC
hild
TableObj
*
pTable
)
{
static
void
mnodeDestroyChildTable
(
SCTableObj
*
pTable
)
{
taosTFree
(
pTable
->
info
.
tableId
);
taosTFree
(
pTable
->
schema
);
taosTFree
(
pTable
->
sql
);
...
...
@@ -101,7 +101,7 @@ static int32_t mnodeChildTableActionDestroy(SSdbOper *pOper) {
}
static
int32_t
mnodeChildTableActionInsert
(
SSdbOper
*
pOper
)
{
SC
hild
TableObj
*
pTable
=
pOper
->
pObj
;
SCTableObj
*
pTable
=
pOper
->
pObj
;
SVgObj
*
pVgroup
=
mnodeGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
...
...
@@ -150,7 +150,7 @@ static int32_t mnodeChildTableActionInsert(SSdbOper *pOper) {
}
static
int32_t
mnodeChildTableActionDelete
(
SSdbOper
*
pOper
)
{
SC
hild
TableObj
*
pTable
=
pOper
->
pObj
;
SCTableObj
*
pTable
=
pOper
->
pObj
;
if
(
pTable
->
vgId
==
0
)
{
return
TSDB_CODE_MND_VGROUP_NOT_EXIST
;
}
...
...
@@ -186,8 +186,8 @@ static int32_t mnodeChildTableActionDelete(SSdbOper *pOper) {
}
static
int32_t
mnodeChildTableActionUpdate
(
SSdbOper
*
pOper
)
{
SC
hild
TableObj
*
pNew
=
pOper
->
pObj
;
SC
hild
TableObj
*
pTable
=
mnodeGetChildTable
(
pNew
->
info
.
tableId
);
SCTableObj
*
pNew
=
pOper
->
pObj
;
SCTableObj
*
pTable
=
mnodeGetChildTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
void
*
oldSql
=
pTable
->
sql
;
...
...
@@ -195,7 +195,7 @@ static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) {
void
*
oldSTable
=
pTable
->
superTable
;
int32_t
oldRefCount
=
pTable
->
refCount
;
memcpy
(
pTable
,
pNew
,
sizeof
(
SC
hild
TableObj
));
memcpy
(
pTable
,
pNew
,
sizeof
(
SCTableObj
));
pTable
->
refCount
=
oldRefCount
;
pTable
->
sql
=
pNew
->
sql
;
...
...
@@ -213,7 +213,7 @@ static int32_t mnodeChildTableActionUpdate(SSdbOper *pOper) {
}
static
int32_t
mnodeChildTableActionEncode
(
SSdbOper
*
pOper
)
{
SC
hild
TableObj
*
pTable
=
pOper
->
pObj
;
SCTableObj
*
pTable
=
pOper
->
pObj
;
assert
(
pTable
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
int32_t
len
=
strlen
(
pTable
->
info
.
tableId
);
...
...
@@ -244,7 +244,7 @@ static int32_t mnodeChildTableActionEncode(SSdbOper *pOper) {
static
int32_t
mnodeChildTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SC
hildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChild
TableObj
));
SC
TableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SC
TableObj
));
if
(
pTable
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
int32_t
len
=
strlen
(
pOper
->
rowData
);
...
...
@@ -284,7 +284,7 @@ static int32_t mnodeChildTableActionDecode(SSdbOper *pOper) {
static
int32_t
mnodeChildTableActionRestored
()
{
void
*
pIter
=
NULL
;
SC
hild
TableObj
*
pTable
=
NULL
;
SCTableObj
*
pTable
=
NULL
;
while
(
1
)
{
pIter
=
mnodeGetNextChildTable
(
pIter
,
&
pTable
);
...
...
@@ -323,7 +323,7 @@ static int32_t mnodeChildTableActionRestored() {
}
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
SS
uper
TableObj
*
pSuperTable
=
mnodeGetSuperTableByUid
(
pTable
->
suid
);
SSTableObj
*
pSuperTable
=
mnodeGetSuperTableByUid
(
pTable
->
suid
);
if
(
pSuperTable
==
NULL
)
{
mError
(
"ctable:%s, stable:%"
PRIu64
" not exist"
,
pTable
->
info
.
tableId
,
pTable
->
suid
);
pTable
->
vgId
=
0
;
...
...
@@ -344,14 +344,14 @@ static int32_t mnodeChildTableActionRestored() {
}
static
int32_t
mnodeInitChildTables
()
{
SC
hild
TableObj
tObj
;
SCTableObj
tObj
;
tsChildTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_CTABLE
,
.
tableName
=
"ctables"
,
.
hashSessions
=
TSDB_DEFAULT_CTABLES_HASH_SIZE
,
.
maxRowSize
=
sizeof
(
SC
hild
TableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_FNAME_LEN
+
TSDB_CQ_SQL_SIZE
,
.
maxRowSize
=
sizeof
(
SCTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_FNAME_LEN
+
TSDB_CQ_SQL_SIZE
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_VAR_STRING
,
.
insertFp
=
mnodeChildTableActionInsert
,
...
...
@@ -386,7 +386,7 @@ int64_t mnodeGetChildTableNum() {
return
sdbGetNumOfRows
(
tsChildTableSdb
);
}
static
void
mnodeAddTableIntoStable
(
SS
uperTableObj
*
pStable
,
SChild
TableObj
*
pCtable
)
{
static
void
mnodeAddTableIntoStable
(
SS
TableObj
*
pStable
,
SC
TableObj
*
pCtable
)
{
atomic_add_fetch_32
(
&
pStable
->
numOfTables
,
1
);
if
(
pStable
->
vgHash
==
NULL
)
{
...
...
@@ -402,7 +402,7 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt
}
}
static
void
mnodeRemoveTableFromStable
(
SS
uperTableObj
*
pStable
,
SChild
TableObj
*
pCtable
)
{
static
void
mnodeRemoveTableFromStable
(
SS
TableObj
*
pStable
,
SC
TableObj
*
pCtable
)
{
atomic_sub_fetch_32
(
&
pStable
->
numOfTables
,
1
);
if
(
pStable
->
vgHash
==
NULL
)
return
;
...
...
@@ -416,7 +416,7 @@ static void mnodeRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *
mnodeDecVgroupRef
(
pVgroup
);
}
static
void
mnodeDestroySuperTable
(
SS
uper
TableObj
*
pStable
)
{
static
void
mnodeDestroySuperTable
(
SSTableObj
*
pStable
)
{
if
(
pStable
->
vgHash
!=
NULL
)
{
taosHashCleanup
(
pStable
->
vgHash
);
pStable
->
vgHash
=
NULL
;
...
...
@@ -432,7 +432,7 @@ static int32_t mnodeSuperTableActionDestroy(SSdbOper *pOper) {
}
static
int32_t
mnodeSuperTableActionInsert
(
SSdbOper
*
pOper
)
{
SS
uper
TableObj
*
pStable
=
pOper
->
pObj
;
SSTableObj
*
pStable
=
pOper
->
pObj
;
SDbObj
*
pDb
=
mnodeGetDbByTableId
(
pStable
->
info
.
tableId
);
if
(
pDb
!=
NULL
&&
pDb
->
status
==
TSDB_DB_STATUS_READY
)
{
mnodeAddSuperTableIntoDb
(
pDb
);
...
...
@@ -443,11 +443,11 @@ static int32_t mnodeSuperTableActionInsert(SSdbOper *pOper) {
}
static
int32_t
mnodeSuperTableActionDelete
(
SSdbOper
*
pOper
)
{
SS
uper
TableObj
*
pStable
=
pOper
->
pObj
;
SSTableObj
*
pStable
=
pOper
->
pObj
;
SDbObj
*
pDb
=
mnodeGetDbByTableId
(
pStable
->
info
.
tableId
);
if
(
pDb
!=
NULL
)
{
mnodeRemoveSuperTableFromDb
(
pDb
);
mnodeDropAllChildTablesInStable
((
SS
uper
TableObj
*
)
pStable
);
mnodeDropAllChildTablesInStable
((
SSTableObj
*
)
pStable
);
}
mnodeDecDbRef
(
pDb
);
...
...
@@ -455,8 +455,8 @@ static int32_t mnodeSuperTableActionDelete(SSdbOper *pOper) {
}
static
int32_t
mnodeSuperTableActionUpdate
(
SSdbOper
*
pOper
)
{
SS
uper
TableObj
*
pNew
=
pOper
->
pObj
;
SS
uper
TableObj
*
pTable
=
mnodeGetSuperTable
(
pNew
->
info
.
tableId
);
SSTableObj
*
pNew
=
pOper
->
pObj
;
SSTableObj
*
pTable
=
mnodeGetSuperTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
NULL
&&
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
void
*
oldSchema
=
pTable
->
schema
;
...
...
@@ -464,7 +464,7 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) {
int32_t
oldRefCount
=
pTable
->
refCount
;
int32_t
oldNumOfTables
=
pTable
->
numOfTables
;
memcpy
(
pTable
,
pNew
,
sizeof
(
SS
uper
TableObj
));
memcpy
(
pTable
,
pNew
,
sizeof
(
SSTableObj
));
pTable
->
vgHash
=
oldVgHash
;
pTable
->
refCount
=
oldRefCount
;
...
...
@@ -480,7 +480,7 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) {
}
static
int32_t
mnodeSuperTableActionEncode
(
SSdbOper
*
pOper
)
{
SS
uper
TableObj
*
pStable
=
pOper
->
pObj
;
SSTableObj
*
pStable
=
pOper
->
pObj
;
assert
(
pOper
->
pObj
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
int32_t
len
=
strlen
(
pStable
->
info
.
tableId
);
...
...
@@ -504,7 +504,7 @@ static int32_t mnodeSuperTableActionEncode(SSdbOper *pOper) {
static
int32_t
mnodeSuperTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SS
uperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
1
,
sizeof
(
SSuper
TableObj
));
SS
TableObj
*
pStable
=
(
SSTableObj
*
)
calloc
(
1
,
sizeof
(
SS
TableObj
));
if
(
pStable
==
NULL
)
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
int32_t
len
=
strlen
(
pOper
->
rowData
);
...
...
@@ -537,14 +537,14 @@ static int32_t mnodeSuperTableActionRestored() {
}
static
int32_t
mnodeInitSuperTables
()
{
SS
uper
TableObj
tObj
;
SSTableObj
tObj
;
tsSuperTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_STABLE
,
.
tableName
=
"stables"
,
.
hashSessions
=
TSDB_DEFAULT_STABLES_HASH_SIZE
,
.
maxRowSize
=
sizeof
(
SS
uper
TableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_FNAME_LEN
,
.
maxRowSize
=
sizeof
(
SSTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_FNAME_LEN
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_VAR_STRING
,
.
insertFp
=
mnodeSuperTableActionInsert
,
...
...
@@ -615,7 +615,7 @@ static void *mnodeGetSuperTable(char *tableId) {
}
static
void
*
mnodeGetSuperTableByUid
(
uint64_t
uid
)
{
SS
uper
TableObj
*
pStable
=
NULL
;
SSTableObj
*
pStable
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
...
...
@@ -647,11 +647,11 @@ void *mnodeGetTable(char *tableId) {
return
NULL
;
}
void
*
mnodeGetNextChildTable
(
void
*
pIter
,
SC
hild
TableObj
**
pTable
)
{
void
*
mnodeGetNextChildTable
(
void
*
pIter
,
SCTableObj
**
pTable
)
{
return
sdbFetchRow
(
tsChildTableSdb
,
pIter
,
(
void
**
)
pTable
);
}
void
*
mnodeGetNextSuperTable
(
void
*
pIter
,
SS
uper
TableObj
**
pTable
)
{
void
*
mnodeGetNextSuperTable
(
void
*
pIter
,
SSTableObj
**
pTable
)
{
return
sdbFetchRow
(
tsSuperTableSdb
,
pIter
,
(
void
**
)
pTable
);
}
...
...
@@ -765,12 +765,12 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
}
if
(
pMsg
->
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
SS
uperTableObj
*
pSTable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pSTable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mInfo
(
"app:%p:%p, table:%s, start to drop stable, uid:%"
PRIu64
", numOfChildTables:%d, sizeOfVgList:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDrop
->
tableId
,
pSTable
->
uid
,
pSTable
->
numOfTables
,
(
int32_t
)
taosHashGetSize
(
pSTable
->
vgHash
));
return
mnodeProcessDropSuperTableMsg
(
pMsg
);
}
else
{
SC
hildTableObj
*
pCTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pCTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
mInfo
(
"app:%p:%p, table:%s, start to drop ctable, vgId:%d tid:%d uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pDrop
->
tableId
,
pCTable
->
vgId
,
pCTable
->
tid
,
pCTable
->
uid
);
return
mnodeProcessDropChildTableMsg
(
pMsg
);
...
...
@@ -816,7 +816,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
}
static
int32_t
mnodeCreateSuperTableCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pTable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pTable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
assert
(
pTable
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -835,7 +835,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
if
(
pMsg
==
NULL
)
return
TSDB_CODE_MND_APP_ERROR
;
SCMCreateTableMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
SS
uperTableObj
*
pStable
=
calloc
(
1
,
sizeof
(
SSuper
TableObj
));
SS
TableObj
*
pStable
=
calloc
(
1
,
sizeof
(
SS
TableObj
));
if
(
pStable
==
NULL
)
{
mError
(
"app:%p:%p, table:%s, failed to create, no enough memory"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pCreate
->
tableId
);
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
...
...
@@ -878,7 +878,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
.
rowSize
=
sizeof
(
SS
uper
TableObj
)
+
schemaSize
,
.
rowSize
=
sizeof
(
SSTableObj
)
+
schemaSize
,
.
pMsg
=
pMsg
,
.
writeCb
=
mnodeCreateSuperTableCb
};
...
...
@@ -894,7 +894,7 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
}
static
int32_t
mnodeDropSuperTableCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pTable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pTable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"app:%p:%p, stable:%s, failed to drop, sdb error"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
);
}
else
{
...
...
@@ -907,7 +907,7 @@ static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
static
int32_t
mnodeProcessDropSuperTableMsg
(
SMnodeMsg
*
pMsg
)
{
if
(
pMsg
==
NULL
)
return
TSDB_CODE_MND_APP_ERROR
;
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
if
(
pStable
->
vgHash
!=
NULL
/*pStable->numOfTables != 0*/
)
{
SHashMutableIterator
*
pIter
=
taosHashCreateIter
(
pStable
->
vgHash
);
while
(
taosHashIterNext
(
pIter
))
{
...
...
@@ -950,7 +950,7 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
return
code
;
}
static
int32_t
mnodeFindSuperTableTagIndex
(
SS
uper
TableObj
*
pStable
,
const
char
*
tagName
)
{
static
int32_t
mnodeFindSuperTableTagIndex
(
SSTableObj
*
pStable
,
const
char
*
tagName
)
{
SSchema
*
schema
=
(
SSchema
*
)
pStable
->
schema
;
for
(
int32_t
tag
=
0
;
tag
<
pStable
->
numOfTags
;
tag
++
)
{
if
(
strcasecmp
(
schema
[
pStable
->
numOfColumns
+
tag
].
name
,
tagName
)
==
0
)
{
...
...
@@ -962,7 +962,7 @@ static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *
}
static
int32_t
mnodeAddSuperTableTagCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, stable %s, add tag result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
tstrerror
(
code
));
...
...
@@ -970,7 +970,7 @@ static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
}
static
int32_t
mnodeAddSuperTableTag
(
SMnodeMsg
*
pMsg
,
SSchema
schema
[],
int32_t
ntags
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
if
(
pStable
->
numOfTags
+
ntags
>
TSDB_MAX_TAGS
)
{
mError
(
"app:%p:%p, stable:%s, add tag, too many tags"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
);
return
TSDB_CODE_MND_TOO_MANY_TAGS
;
...
...
@@ -1018,14 +1018,14 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
}
static
int32_t
mnodeDropSuperTableTagCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, stable %s, drop tag result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
tstrerror
(
code
));
return
code
;
}
static
int32_t
mnodeDropSuperTableTag
(
SMnodeMsg
*
pMsg
,
char
*
tagName
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
int32_t
col
=
mnodeFindSuperTableTagIndex
(
pStable
,
tagName
);
if
(
col
<
0
)
{
mError
(
"app:%p:%p, stable:%s, drop tag, tag:%s not exist"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
...
...
@@ -1052,14 +1052,14 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
}
static
int32_t
mnodeModifySuperTableTagNameCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, stable %s, modify tag result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
tstrerror
(
code
));
return
code
;
}
static
int32_t
mnodeModifySuperTableTagName
(
SMnodeMsg
*
pMsg
,
char
*
oldTagName
,
char
*
newTagName
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
int32_t
col
=
mnodeFindSuperTableTagIndex
(
pStable
,
oldTagName
);
if
(
col
<
0
)
{
mError
(
"app:%p:%p, stable:%s, failed to modify table tag, oldName: %s, newName: %s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
...
...
@@ -1095,7 +1095,7 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
return
sdbUpdateRow
(
&
oper
);
}
static
int32_t
mnodeFindSuperTableColumnIndex
(
SS
uper
TableObj
*
pStable
,
char
*
colName
)
{
static
int32_t
mnodeFindSuperTableColumnIndex
(
SSTableObj
*
pStable
,
char
*
colName
)
{
SSchema
*
schema
=
(
SSchema
*
)
pStable
->
schema
;
for
(
int32_t
col
=
0
;
col
<
pStable
->
numOfColumns
;
col
++
)
{
if
(
strcasecmp
(
schema
[
col
].
name
,
colName
)
==
0
)
{
...
...
@@ -1107,7 +1107,7 @@ static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *col
}
static
int32_t
mnodeAddSuperTableColumnCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, stable %s, add column result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
tstrerror
(
code
));
return
code
;
...
...
@@ -1115,7 +1115,7 @@ static int32_t mnodeAddSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
static
int32_t
mnodeAddSuperTableColumn
(
SMnodeMsg
*
pMsg
,
SSchema
schema
[],
int32_t
ncols
)
{
SDbObj
*
pDb
=
pMsg
->
pDb
;
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
if
(
ncols
<=
0
)
{
mError
(
"app:%p:%p, stable:%s, add column, ncols:%d <= 0"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
ncols
);
return
TSDB_CODE_MND_APP_ERROR
;
...
...
@@ -1170,7 +1170,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
}
static
int32_t
mnodeDropSuperTableColumnCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, stable %s, delete column result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
tstrerror
(
code
));
return
code
;
...
...
@@ -1178,7 +1178,7 @@ static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
static
int32_t
mnodeDropSuperTableColumn
(
SMnodeMsg
*
pMsg
,
char
*
colName
)
{
SDbObj
*
pDb
=
pMsg
->
pDb
;
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
int32_t
col
=
mnodeFindSuperTableColumnIndex
(
pStable
,
colName
);
if
(
col
<=
0
)
{
mError
(
"app:%p:%p, stable:%s, drop column, column:%s not exist"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
...
...
@@ -1215,14 +1215,14 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
}
static
int32_t
mnodeChangeSuperTableColumnCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, stable %s, change column result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pStable
->
info
.
tableId
,
tstrerror
(
code
));
return
code
;
}
static
int32_t
mnodeChangeSuperTableColumn
(
SMnodeMsg
*
pMsg
,
char
*
oldName
,
char
*
newName
)
{
SS
uperTableObj
*
pStable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pStable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
int32_t
col
=
mnodeFindSuperTableColumnIndex
(
pStable
,
oldName
);
if
(
col
<
0
)
{
mError
(
"app:%p:%p, stable:%s, change column, oldName: %s, newName: %s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
...
...
@@ -1321,7 +1321,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
int32_t
numOfRows
=
0
;
char
*
pWrite
;
int32_t
cols
=
0
;
SS
uper
TableObj
*
pTable
=
NULL
;
SSTableObj
*
pTable
=
NULL
;
char
prefix
[
64
]
=
{
0
};
int32_t
prefixLen
;
...
...
@@ -1399,7 +1399,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
void
mnodeDropAllSuperTables
(
SDbObj
*
pDropDb
)
{
void
*
pIter
=
NULL
;
int32_t
numOfTables
=
0
;
SS
uper
TableObj
*
pTable
=
NULL
;
SSTableObj
*
pTable
=
NULL
;
char
prefix
[
64
]
=
{
0
};
tstrncpy
(
prefix
,
pDropDb
->
name
,
64
);
...
...
@@ -1430,7 +1430,7 @@ void mnodeDropAllSuperTables(SDbObj *pDropDb) {
mInfo
(
"db:%s, all super tables:%d is dropped from sdb"
,
pDropDb
->
name
,
numOfTables
);
}
static
int32_t
mnodeSetSchemaFromSuperTable
(
SSchema
*
pSchema
,
SS
uper
TableObj
*
pTable
)
{
static
int32_t
mnodeSetSchemaFromSuperTable
(
SSchema
*
pSchema
,
SSTableObj
*
pTable
)
{
int32_t
numOfCols
=
pTable
->
numOfColumns
+
pTable
->
numOfTags
;
assert
(
numOfCols
<=
TSDB_MAX_COLUMNS
);
...
...
@@ -1446,7 +1446,7 @@ static int32_t mnodeSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pT
}
static
int32_t
mnodeGetSuperTableMeta
(
SMnodeMsg
*
pMsg
)
{
SS
uperTableObj
*
pTable
=
(
SSuper
TableObj
*
)
pMsg
->
pTable
;
SS
TableObj
*
pTable
=
(
SS
TableObj
*
)
pMsg
->
pTable
;
STableMetaMsg
*
pMeta
=
rpcMallocCont
(
sizeof
(
STableMetaMsg
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
));
if
(
pMeta
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
...
...
@@ -1479,7 +1479,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
int32_t
contLen
=
sizeof
(
SSTableVgroupRspMsg
)
+
32
*
sizeof
(
SVgroupMsg
)
+
sizeof
(
SVgroupsMsg
);
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
++
i
)
{
char
*
stableName
=
(
char
*
)
pInfo
+
sizeof
(
SSTableVgroupMsg
)
+
(
TSDB_TABLE_FNAME_LEN
)
*
i
;
SS
uper
TableObj
*
pTable
=
mnodeGetSuperTable
(
stableName
);
SSTableObj
*
pTable
=
mnodeGetSuperTable
(
stableName
);
if
(
pTable
!=
NULL
&&
pTable
->
vgHash
!=
NULL
)
{
contLen
+=
(
taosHashGetSize
(
pTable
->
vgHash
)
*
sizeof
(
SVgroupMsg
)
+
sizeof
(
SVgroupsMsg
));
}
...
...
@@ -1497,7 +1497,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
++
i
)
{
char
*
stableName
=
(
char
*
)
pInfo
+
sizeof
(
SSTableVgroupMsg
)
+
(
TSDB_TABLE_FNAME_LEN
)
*
i
;
SS
uper
TableObj
*
pTable
=
mnodeGetSuperTable
(
stableName
);
SSTableObj
*
pTable
=
mnodeGetSuperTable
(
stableName
);
if
(
pTable
==
NULL
)
{
mError
(
"app:%p:%p, stable:%s, not exist while get stable vgroup info"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
stableName
);
mnodeDecTableRef
(
pTable
);
...
...
@@ -1569,7 +1569,7 @@ static void mnodeProcessDropSuperTableRsp(SRpcMsg *rpcMsg) {
mInfo
(
"drop stable rsp received, result:%s"
,
tstrerror
(
rpcMsg
->
code
));
}
static
void
*
mnodeBuildCreateChildTableMsg
(
SCMCreateTableMsg
*
pMsg
,
SC
hild
TableObj
*
pTable
)
{
static
void
*
mnodeBuildCreateChildTableMsg
(
SCMCreateTableMsg
*
pMsg
,
SCTableObj
*
pTable
)
{
STagData
*
pTagData
=
NULL
;
int32_t
tagDataLen
=
0
;
int32_t
totalCols
=
0
;
...
...
@@ -1643,7 +1643,7 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
}
static
int32_t
mnodeDoCreateChildTableFp
(
SMnodeMsg
*
pMsg
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
assert
(
pTable
);
mDebug
(
"app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%"
PRIu64
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
...
...
@@ -1669,7 +1669,7 @@ static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
}
static
int32_t
mnodeDoCreateChildTableCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
SCMCreateTableMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
assert
(
pTable
);
...
...
@@ -1699,7 +1699,7 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
static
int32_t
mnodeDoCreateChildTable
(
SMnodeMsg
*
pMsg
,
int32_t
tid
)
{
SVgObj
*
pVgroup
=
pMsg
->
pVgroup
;
SCMCreateTableMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
SC
hildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChild
TableObj
));
SC
TableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SC
TableObj
));
if
(
pTable
==
NULL
)
{
mError
(
"app:%p:%p, table:%s, failed to alloc memory"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pCreate
->
tableId
);
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
...
...
@@ -1842,7 +1842,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
}
static
int32_t
mnodeSendDropChildTableMsg
(
SMnodeMsg
*
pMsg
,
bool
needReturn
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
mLInfo
(
"app:%p:%p, ctable:%s, is dropped from sdb"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
);
SMDDropTableMsg
*
pDrop
=
rpcMallocCont
(
sizeof
(
SMDDropTableMsg
));
...
...
@@ -1880,7 +1880,7 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
static
int32_t
mnodeDropChildTableCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
mError
(
"app:%p:%p, ctable:%s, failed to drop, sdb error"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
);
return
code
;
}
...
...
@@ -1889,7 +1889,7 @@ static int32_t mnodeDropChildTableCb(SMnodeMsg *pMsg, int32_t code) {
}
static
int32_t
mnodeProcessDropChildTableMsg
(
SMnodeMsg
*
pMsg
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
if
(
pMsg
->
pVgroup
==
NULL
)
pMsg
->
pVgroup
=
mnodeGetVgroup
(
pTable
->
vgId
);
if
(
pMsg
->
pVgroup
==
NULL
)
{
mError
(
"app:%p:%p, table:%s, failed to drop ctable, vgroup not exist"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
...
...
@@ -1914,7 +1914,7 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
return
code
;
}
static
int32_t
mnodeFindNormalTableColumnIndex
(
SC
hild
TableObj
*
pTable
,
char
*
colName
)
{
static
int32_t
mnodeFindNormalTableColumnIndex
(
SCTableObj
*
pTable
,
char
*
colName
)
{
SSchema
*
schema
=
(
SSchema
*
)
pTable
->
schema
;
for
(
int32_t
col
=
0
;
col
<
pTable
->
numOfColumns
;
col
++
)
{
if
(
strcasecmp
(
schema
[
col
].
name
,
colName
)
==
0
)
{
...
...
@@ -1926,7 +1926,7 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col
}
static
int32_t
mnodeAlterNormalTableColumnCb
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"app:%p:%p, ctable %s, failed to alter column, reason:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
tstrerror
(
code
));
...
...
@@ -1965,7 +1965,7 @@ static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
}
static
int32_t
mnodeAddNormalTableColumn
(
SMnodeMsg
*
pMsg
,
SSchema
schema
[],
int32_t
ncols
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
SDbObj
*
pDb
=
pMsg
->
pDb
;
if
(
ncols
<=
0
)
{
mError
(
"app:%p:%p, ctable:%s, add column, ncols:%d <= 0"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
ncols
);
...
...
@@ -2014,7 +2014,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
static
int32_t
mnodeDropNormalTableColumn
(
SMnodeMsg
*
pMsg
,
char
*
colName
)
{
SDbObj
*
pDb
=
pMsg
->
pDb
;
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
int32_t
col
=
mnodeFindNormalTableColumnIndex
(
pTable
,
colName
);
if
(
col
<=
0
)
{
mError
(
"app:%p:%p, ctable:%s, drop column, column:%s not exist"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
pTable
->
info
.
tableId
,
...
...
@@ -2046,7 +2046,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
}
static
int32_t
mnodeChangeNormalTableColumn
(
SMnodeMsg
*
pMsg
,
char
*
oldName
,
char
*
newName
)
{
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
int32_t
col
=
mnodeFindNormalTableColumnIndex
(
pTable
,
oldName
);
if
(
col
<
0
)
{
mError
(
"app:%p:%p, ctable:%s, change column, oldName: %s, newName: %s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
...
...
@@ -2082,7 +2082,7 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
return
sdbUpdateRow
(
&
oper
);
}
static
int32_t
mnodeSetSchemaFromNormalTable
(
SSchema
*
pSchema
,
SC
hild
TableObj
*
pTable
)
{
static
int32_t
mnodeSetSchemaFromNormalTable
(
SSchema
*
pSchema
,
SCTableObj
*
pTable
)
{
int32_t
numOfCols
=
pTable
->
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
strcpy
(
pSchema
->
name
,
pTable
->
schema
[
i
].
name
);
...
...
@@ -2097,7 +2097,7 @@ static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *p
static
int32_t
mnodeDoGetChildTableMeta
(
SMnodeMsg
*
pMsg
,
STableMetaMsg
*
pMeta
)
{
SDbObj
*
pDb
=
pMsg
->
pDb
;
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
pMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
pMsg
->
pTable
;
pMeta
->
uid
=
htobe64
(
pTable
->
uid
);
pMeta
->
tid
=
htonl
(
pTable
->
tid
);
...
...
@@ -2203,7 +2203,7 @@ static int32_t mnodeGetChildTableMeta(SMnodeMsg *pMsg) {
void
mnodeDropAllChildTablesInVgroups
(
SVgObj
*
pVgroup
)
{
void
*
pIter
=
NULL
;
int32_t
numOfTables
=
0
;
SC
hild
TableObj
*
pTable
=
NULL
;
SCTableObj
*
pTable
=
NULL
;
mInfo
(
"vgId:%d, all child tables will be dropped from sdb"
,
pVgroup
->
vgId
);
...
...
@@ -2231,7 +2231,7 @@ void mnodeDropAllChildTablesInVgroups(SVgObj *pVgroup) {
void
mnodeDropAllChildTables
(
SDbObj
*
pDropDb
)
{
void
*
pIter
=
NULL
;
int32_t
numOfTables
=
0
;
SC
hild
TableObj
*
pTable
=
NULL
;
SCTableObj
*
pTable
=
NULL
;
char
prefix
[
64
]
=
{
0
};
tstrncpy
(
prefix
,
pDropDb
->
name
,
64
);
...
...
@@ -2261,10 +2261,10 @@ void mnodeDropAllChildTables(SDbObj *pDropDb) {
mInfo
(
"db:%s, all child tables:%d is dropped from sdb"
,
pDropDb
->
name
,
numOfTables
);
}
static
void
mnodeDropAllChildTablesInStable
(
SS
uper
TableObj
*
pStable
)
{
static
void
mnodeDropAllChildTablesInStable
(
SSTableObj
*
pStable
)
{
void
*
pIter
=
NULL
;
int32_t
numOfTables
=
0
;
SC
hild
TableObj
*
pTable
=
NULL
;
SCTableObj
*
pTable
=
NULL
;
mInfo
(
"stable:%s uid:%"
PRIu64
", all child tables:%d will be dropped from sdb"
,
pStable
->
info
.
tableId
,
pStable
->
uid
,
pStable
->
numOfTables
);
...
...
@@ -2292,11 +2292,11 @@ static void mnodeDropAllChildTablesInStable(SSuperTableObj *pStable) {
}
#if 0
static SC
hild
TableObj* mnodeGetTableByPos(int32_t vnode, int32_t tid) {
static SCTableObj* mnodeGetTableByPos(int32_t vnode, int32_t tid) {
SVgObj *pVgroup = mnodeGetVgroup(vnode);
if (pVgroup == NULL) return NULL;
SC
hild
TableObj *pTable = pVgroup->tableList[tid - 1];
SCTableObj *pTable = pVgroup->tableList[tid - 1];
mnodeIncTableRef((STableObj *)pTable);
mnodeDecVgroupRef(pVgroup);
...
...
@@ -2314,7 +2314,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
mDebug("app:%p:%p, dnode:%d, vgId:%d sid:%d, receive table config msg", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId,
pCfg->vgId, pCfg->sid);
SC
hild
TableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid);
SCTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid);
if (pTable == NULL) {
mError("app:%p:%p, dnode:%d, vgId:%d sid:%d, table not found", pMsg->rpcMsg.ahandle, pMsg, pCfg->dnodeId,
pCfg->vgId, pCfg->sid);
...
...
@@ -2322,7 +2322,7 @@ static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
}
SMDCreateTableMsg *pCreate = NULL;
pCreate = mnodeBuildCreateChildTableMsg(NULL, (SC
hild
TableObj *)pTable);
pCreate = mnodeBuildCreateChildTableMsg(NULL, (SCTableObj *)pTable);
mnodeDecTableRef(pTable);
if (pCreate == NULL) return terrno;
...
...
@@ -2340,7 +2340,7 @@ static void mnodeProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
ahandle
;
mnodeMsg
->
received
++
;
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
mnodeMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
mnodeMsg
->
pTable
;
assert
(
pTable
);
mInfo
(
"app:%p:%p, table:%s, drop table rsp received, vgId:%d sid:%d uid:%"
PRIu64
", thandle:%p result:%s"
,
...
...
@@ -2381,7 +2381,7 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
ahandle
;
mnodeMsg
->
received
++
;
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
mnodeMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
mnodeMsg
->
pTable
;
assert
(
pTable
);
// If the table is deleted by another thread during creation, stop creating and send drop msg to vnode
...
...
@@ -2445,7 +2445,7 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
SMnodeMsg
*
mnodeMsg
=
rpcMsg
->
ahandle
;
mnodeMsg
->
received
++
;
SC
hildTableObj
*
pTable
=
(
SChild
TableObj
*
)
mnodeMsg
->
pTable
;
SC
TableObj
*
pTable
=
(
SC
TableObj
*
)
mnodeMsg
->
pTable
;
assert
(
pTable
);
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
||
rpcMsg
->
code
==
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
)
{
...
...
@@ -2483,7 +2483,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
for
(
int32_t
t
=
0
;
t
<
pInfo
->
numOfTables
;
++
t
)
{
char
*
tableId
=
(
char
*
)(
pInfo
->
tableIds
+
t
*
TSDB_TABLE_FNAME_LEN
);
SC
hild
TableObj
*
pTable
=
mnodeGetChildTable
(
tableId
);
SCTableObj
*
pTable
=
mnodeGetChildTable
(
tableId
);
if
(
pTable
==
NULL
)
continue
;
if
(
pMsg
->
pDb
==
NULL
)
pMsg
->
pDb
=
mnodeGetDbByTableId
(
tableId
);
...
...
@@ -2607,7 +2607,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
int32_t
cols
=
0
;
int32_t
numOfRows
=
0
;
SC
hild
TableObj
*
pTable
=
NULL
;
SCTableObj
*
pTable
=
NULL
;
SPatternCompareInfo
info
=
PATTERN_COMPARE_INFO_INITIALIZER
;
char
prefix
[
64
]
=
{
0
};
...
...
@@ -2843,7 +2843,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
}
int32_t
numOfRows
=
0
;
SC
hild
TableObj
*
pTable
=
NULL
;
SCTableObj
*
pTable
=
NULL
;
SPatternCompareInfo
info
=
PATTERN_COMPARE_INFO_INITIALIZER
;
char
prefix
[
64
]
=
{
0
};
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
e4e86609
...
...
@@ -694,7 +694,7 @@ static bool mnodeFilterVgroups(SVgObj *pVgroup, STableObj *pTable) {
return
true
;
}
SC
hildTableObj
*
pCTable
=
(
SChild
TableObj
*
)
pTable
;
SC
TableObj
*
pCTable
=
(
SC
TableObj
*
)
pTable
;
if
(
pVgroup
->
vgId
==
pCTable
->
vgId
)
{
return
true
;
}
else
{
...
...
@@ -791,7 +791,7 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
return
numOfRows
;
}
void
mnodeAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
SC
hild
TableObj
*
pTable
)
{
void
mnodeAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
SCTableObj
*
pTable
)
{
int32_t
idPoolSize
=
taosIdPoolMaxSize
(
pVgroup
->
idPool
);
if
(
pTable
->
tid
>
idPoolSize
)
{
mnodeAllocVgroupIdPool
(
pVgroup
);
...
...
@@ -807,7 +807,7 @@ void mnodeAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
}
}
void
mnodeRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
SC
hild
TableObj
*
pTable
)
{
void
mnodeRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
SCTableObj
*
pTable
)
{
if
(
pTable
->
tid
>=
1
)
{
taosFreeId
(
pVgroup
->
idPool
,
pTable
->
tid
);
pVgroup
->
numOfTables
--
;
...
...
src/plugins/http/src/httpQueue.c
浏览文件 @
e4e86609
...
...
@@ -49,7 +49,7 @@ static taos_queue tsHttpQueue;
void
httpDispatchToResultQueue
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
numOfRows
,
void
(
*
fp
)(
void
*
param
,
void
*
result
,
int32_t
numOfRows
))
{
if
(
tsHttpQueue
!=
NULL
)
{
SHttpResult
*
pMsg
=
(
SHttpResult
*
)
taosAllocateQitem
(
sizeof
(
SHttpResult
));
SHttpResult
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SHttpResult
));
pMsg
->
param
=
param
;
pMsg
->
result
=
result
;
pMsg
->
numOfRows
=
numOfRows
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录