Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
34eb31ce
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
34eb31ce
编写于
5月 16, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
5月 16, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1936 from taosdata/feature/mwrite
Feature/mwrite
上级
f49d6b17
9f2237a1
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
167 addition
and
109 deletion
+167
-109
.gitignore
.gitignore
+3
-0
src/inc/taosdef.h
src/inc/taosdef.h
+1
-1
src/mnode/inc/mgmtDef.h
src/mnode/inc/mgmtDef.h
+1
-1
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+2
-1
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+80
-57
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+79
-48
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+1
-1
未找到文件。
.gitignore
浏览文件 @
34eb31ce
...
...
@@ -33,5 +33,8 @@ Target/
*.failed
*.sql
sim/
psim/
pysim/
*.out
*DS_Store
src/inc/taosdef.h
浏览文件 @
34eb31ce
...
...
@@ -236,7 +236,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100)
#define TSDB_DEFAULT_PAYLOAD_SIZE 1024 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_
SQLCMD
_SIZE 1024
#define TSDB_
CQ_SQL
_SIZE 1024
#define TSDB_MAX_VNODES 256
#define TSDB_MIN_VNODES 50
#define TSDB_INVALID_VNODE_NUM 0
...
...
src/mnode/inc/mgmtDef.h
浏览文件 @
34eb31ce
...
...
@@ -68,7 +68,7 @@ typedef struct SMnodeObj {
// todo use dynamic length string
typedef
struct
{
char
tableId
[
TSDB_TABLE_ID_LEN
+
1
]
;
char
*
tableId
;
int8_t
type
;
}
STableObj
;
...
...
src/mnode/inc/mgmtSdb.h
浏览文件 @
34eb31ce
...
...
@@ -35,7 +35,8 @@ typedef enum {
typedef
enum
{
SDB_KEY_STRING
,
SDB_KEY_INT
,
SDB_KEY_AUTO
SDB_KEY_AUTO
,
SDB_KEY_VAR_STRING
,
}
ESdbKey
;
typedef
enum
{
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
34eb31ce
...
...
@@ -104,6 +104,14 @@ bool sdbIsServing() {
return
tsSdbObj
.
status
==
SDB_STATUS_SERVING
;
}
static
void
*
sdbGetObjKey
(
SSdbTable
*
pTable
,
void
*
key
)
{
if
(
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
return
*
(
char
**
)
key
;
}
return
key
;
}
static
char
*
sdbGetActionStr
(
int32_t
action
)
{
switch
(
action
)
{
case
SDB_ACTION_INSERT
:
...
...
@@ -116,20 +124,25 @@ static char *sdbGetActionStr(int32_t action) {
return
"invalid"
;
}
static
char
*
sdbGet
keyStr
(
SSdbTable
*
pTable
,
void
*
row
)
{
static
char
*
sdbGet
KeyStr
(
SSdbTable
*
pTable
,
void
*
key
)
{
static
char
str
[
16
];
switch
(
pTable
->
keyType
)
{
case
SDB_KEY_STRING
:
return
(
char
*
)
row
;
case
SDB_KEY_VAR_STRING
:
return
(
char
*
)
key
;
case
SDB_KEY_INT
:
case
SDB_KEY_AUTO
:
sprintf
(
str
,
"%d"
,
*
(
int32_t
*
)
row
);
sprintf
(
str
,
"%d"
,
*
(
int32_t
*
)
key
);
return
str
;
default:
return
"invalid"
;
}
}
static
char
*
sdbGetKeyStrFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetKeyStr
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
static
void
*
sdbGetTableFromId
(
int32_t
tableId
)
{
return
tsSdbObj
.
tableList
[
tableId
];
}
...
...
@@ -332,50 +345,48 @@ void sdbCleanUp() {
pthread_mutex_destroy
(
&
tsSdbObj
.
mutex
);
}
void
sdbIncRef
(
void
*
handle
,
void
*
pRow
)
{
if
(
pRow
)
{
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
atomic_add_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, add ref to record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
}
void
sdbIncRef
(
void
*
handle
,
void
*
pObj
)
{
if
(
pObj
==
NULL
)
return
;
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pObj
+
pTable
->
refCountPos
);
atomic_add_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, add ref to record:%s:%d"
,
pTable
->
tableName
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
*
pRefCount
);
}
}
void
sdbDecRef
(
void
*
handle
,
void
*
pRow
)
{
if
(
pRow
)
{
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, def ref of record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
}
int8_t
*
updateEnd
=
pRow
+
pTable
->
refCountPos
-
1
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"table:%s, record:%s:%s:%d is destroyed"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
SSdbOper
oper
=
{.
pObj
=
pRow
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
void
sdbDecRef
(
void
*
handle
,
void
*
pObj
)
{
if
(
pObj
==
NULL
)
return
;
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pObj
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
if
(
0
&&
(
pTable
->
tableId
==
SDB_TABLE_MNODE
||
pTable
->
tableId
==
SDB_TABLE_DNODE
))
{
sdbTrace
(
"table:%s, def ref of record:%s:%d"
,
pTable
->
tableName
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
*
pRefCount
);
}
}
static
SSdbRow
*
sdbGetRowMeta
(
void
*
handle
,
void
*
key
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbRow
*
pMeta
;
int8_t
*
updateEnd
=
pObj
+
pTable
->
refCountPos
-
1
;
if
(
refCount
<=
0
&&
*
updateEnd
)
{
sdbTrace
(
"table:%s, record:%s:%d is destroyed"
,
pTable
->
tableName
,
sdbGetKeyStrFromObj
(
pTable
,
pObj
),
*
pRefCount
);
SSdbOper
oper
=
{.
pObj
=
pObj
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
}
if
(
handle
==
NULL
)
return
NULL
;
static
SSdbRow
*
sdbGetRowMeta
(
SSdbTable
*
pTable
,
void
*
key
)
{
if
(
pTable
==
NULL
)
return
NULL
;
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
pMeta
=
taosHashGet
(
pTable
->
iHandle
,
key
,
keySize
);
return
taosHashGet
(
pTable
->
iHandle
,
key
,
keySize
);
}
return
pMeta
;
static
SSdbRow
*
sdbGetRowMetaFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetRowMeta
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
)
{
...
...
@@ -387,7 +398,7 @@ void *sdbGetRow(void *handle, void *key) {
pthread_mutex_lock
(
&
pTable
->
mutex
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
pMeta
=
taosHashGet
(
pTable
->
iHandle
,
key
,
keySize
);
...
...
@@ -400,6 +411,10 @@ void *sdbGetRow(void *handle, void *key) {
return
pMeta
->
row
;
}
static
void
*
sdbGetRowFromObj
(
SSdbTable
*
pTable
,
void
*
key
)
{
return
sdbGetRow
(
pTable
,
sdbGetObjKey
(
pTable
,
key
));
}
static
int32_t
sdbInsertHash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
SSdbRow
rowMeta
;
rowMeta
.
rowSize
=
pOper
->
rowSize
;
...
...
@@ -407,11 +422,14 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
pthread_mutex_lock
(
&
pTable
->
mutex
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pOper
->
pObj
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
keySize
=
strlen
((
char
*
)
pOper
->
pObj
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
taosHashPut
(
pTable
->
iHandle
,
pOper
->
pObj
,
keySize
,
&
rowMeta
,
sizeof
(
SSdbRow
));
taosHashPut
(
pTable
->
iHandle
,
key
,
keySize
,
&
rowMeta
,
sizeof
(
SSdbRow
));
sdbIncRef
(
pTable
,
pOper
->
pObj
);
pTable
->
numOfRows
++
;
...
...
@@ -425,7 +443,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper) {
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, insert record:%s to hash, numOfRows:%d version:%"
PRIu64
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
(
*
pTable
->
insertFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -436,17 +454,20 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
pthread_mutex_lock
(
&
pTable
->
mutex
);
void
*
key
=
sdbGetObjKey
(
pTable
,
pOper
->
pObj
);
int32_t
keySize
=
sizeof
(
int32_t
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
)
{
keySize
=
strlen
((
char
*
)
pOper
->
pObj
);
if
(
pTable
->
keyType
==
SDB_KEY_STRING
||
pTable
->
keyType
==
SDB_KEY_VAR_STRING
)
{
keySize
=
strlen
((
char
*
)
key
);
}
taosHashRemove
(
pTable
->
iHandle
,
pOper
->
pObj
,
keySize
);
taosHashRemove
(
pTable
->
iHandle
,
key
,
keySize
);
pTable
->
numOfRows
--
;
pthread_mutex_unlock
(
&
pTable
->
mutex
);
sdbTrace
(
"table:%s, delete record:%s from hash, numOfRows:%d version:%"
PRIu64
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
int8_t
*
updateEnd
=
pOper
->
pObj
+
pTable
->
refCountPos
-
1
;
*
updateEnd
=
1
;
...
...
@@ -457,7 +478,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper) {
static
int32_t
sdbUpdateHash
(
SSdbTable
*
pTable
,
SSdbOper
*
pOper
)
{
sdbTrace
(
"table:%s, update record:%s in hash, numOfRows:%d version:%"
PRIu64
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
),
pTable
->
numOfRows
,
sdbGetVersion
());
(
*
pTable
->
updateFp
)(
pOper
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -488,7 +509,7 @@ static int sdbWrite(void *param, void *data, int type) {
}
else
if
(
pHead
->
version
!=
tsSdbObj
.
version
+
1
)
{
pthread_mutex_unlock
(
&
tsSdbObj
.
mutex
);
sdbError
(
"table:%s, failed to restore %s record:%s from wal, version:%"
PRId64
" too large, sdb version:%"
PRId64
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGet
k
eyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
pTable
->
tableName
,
sdbGetActionStr
(
action
),
sdbGet
K
eyStr
(
pTable
,
pHead
->
cont
),
pHead
->
version
,
tsSdbObj
.
version
);
return
TSDB_CODE_OTHERS
;
}
else
{
...
...
@@ -540,8 +561,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
if
(
sdbGetRow
(
pTable
,
pOper
->
pObj
))
{
sdbError
(
"table:%s, failed to insert record:%s, already exist"
,
pTable
->
tableName
,
sdbGet
keyStr
(
pTable
,
pOper
->
pObj
));
if
(
sdbGetRow
FromObj
(
pTable
,
pOper
->
pObj
))
{
sdbError
(
"table:%s, failed to insert record:%s, already exist"
,
pTable
->
tableName
,
sdbGet
KeyStrFromObj
(
pTable
,
pOper
->
pObj
));
sdbDecRef
(
pTable
,
pOper
->
pObj
);
return
TSDB_CODE_ALREADY_THERE
;
}
...
...
@@ -580,7 +601,7 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
SSdbRow
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
SSdbRow
*
pMeta
=
sdbGetRowMeta
FromObj
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
-
1
;
...
...
@@ -590,25 +611,27 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
assert
(
pMetaRow
!=
NULL
);
if
(
pOper
->
type
==
SDB_OPER_GLOBAL
)
{
int32_t
rowSize
=
0
;
void
*
key
=
sdbGetObjKey
(
pTable
,
pOper
->
pObj
);
int32_t
keySize
=
0
;
switch
(
pTable
->
keyType
)
{
case
SDB_KEY_STRING
:
rowSize
=
strlen
((
char
*
)
pOper
->
pObj
)
+
1
;
case
SDB_KEY_VAR_STRING
:
keySize
=
strlen
((
char
*
)
key
)
+
1
;
break
;
case
SDB_KEY_INT
:
case
SDB_KEY_AUTO
:
rowSize
=
sizeof
(
uint64
_t
);
keySize
=
sizeof
(
uint32
_t
);
break
;
default:
return
-
1
;
}
int32_t
size
=
sizeof
(
SWalHead
)
+
row
Size
;
int32_t
size
=
sizeof
(
SWalHead
)
+
key
Size
;
SWalHead
*
pHead
=
taosAllocateQitem
(
size
);
pHead
->
version
=
0
;
pHead
->
len
=
row
Size
;
pHead
->
len
=
key
Size
;
pHead
->
msgType
=
pTable
->
tableId
*
10
+
SDB_ACTION_DELETE
;
memcpy
(
pHead
->
cont
,
pOper
->
pObj
,
row
Size
);
memcpy
(
pHead
->
cont
,
key
,
key
Size
);
int32_t
code
=
sdbWrite
(
pOper
,
pHead
,
pHead
->
msgType
);
taosFreeQitem
(
pHead
);
...
...
@@ -622,7 +645,7 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
if
(
pTable
==
NULL
)
return
-
1
;
SSdbRow
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
SSdbRow
*
pMeta
=
sdbGetRowMeta
FromObj
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableName
);
return
-
1
;
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
34eb31ce
...
...
@@ -84,6 +84,7 @@ static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg);
static
int32_t
mgmtFindSuperTableColumnIndex
(
SSuperTableObj
*
pStable
,
char
*
colName
);
static
void
mgmtDestroyChildTable
(
SChildTableObj
*
pTable
)
{
tfree
(
pTable
->
info
.
tableId
);
tfree
(
pTable
->
schema
);
tfree
(
pTable
->
sql
);
tfree
(
pTable
);
...
...
@@ -180,6 +181,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
SChildTableObj
*
pNew
=
pOper
->
pObj
;
SChildTableObj
*
pTable
=
mgmtGetChildTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
void
*
oldSql
=
pTable
->
sql
;
void
*
oldSchema
=
pTable
->
schema
;
memcpy
(
pTable
,
pNew
,
pOper
->
rowSize
);
...
...
@@ -188,6 +190,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
free
(
pNew
);
free
(
oldSql
);
free
(
oldSchema
);
free
(
oldTableId
);
}
mgmtDecTableRef
(
pTable
);
...
...
@@ -195,51 +198,66 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
}
static
int32_t
mgmtChildTableActionEncode
(
SSdbOper
*
pOper
)
{
const
int32_t
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
);
SChildTableObj
*
pTable
=
pOper
->
pObj
;
assert
(
pTable
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
memcpy
(
pOper
->
rowData
,
pTable
,
tsChildTableUpdateSize
);
pOper
->
rowSize
=
tsChildTableUpdateSize
;
}
else
{
int32_t
len
=
strlen
(
pTable
->
info
.
tableId
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
return
TSDB_CODE_INVALID_TABLE_ID
;
memcpy
(
pOper
->
rowData
,
pTable
->
info
.
tableId
,
len
);
memset
(
pOper
->
rowData
+
len
,
0
,
1
);
len
++
;
memcpy
(
pOper
->
rowData
+
len
,
(
char
*
)
pTable
+
sizeof
(
char
*
),
tsChildTableUpdateSize
);
len
+=
tsChildTableUpdateSize
;
if
(
pTable
->
info
.
type
!=
TSDB_CHILD_TABLE
)
{
int32_t
schemaSize
=
pTable
->
numOfColumns
*
sizeof
(
SSchema
);
if
(
maxRowSize
<
tsChildTableUpdateSize
+
schemaSize
)
{
return
TSDB_CODE_INVALID_MSG_LEN
;
memcpy
(
pOper
->
rowData
+
len
,
pTable
->
schema
,
schemaSize
);
len
+=
schemaSize
;
if
(
pTable
->
sqlLen
!=
0
)
{
memcpy
(
pOper
->
rowData
+
len
,
pTable
->
sql
,
pTable
->
sqlLen
);
len
+=
pTable
->
sqlLen
;
}
memcpy
(
pOper
->
rowData
,
pTable
,
tsChildTableUpdateSize
);
memcpy
(
pOper
->
rowData
+
tsChildTableUpdateSize
,
pTable
->
schema
,
schemaSize
);
memcpy
(
pOper
->
rowData
+
tsChildTableUpdateSize
+
schemaSize
,
pTable
->
sql
,
pTable
->
sqlLen
);
pOper
->
rowSize
=
tsChildTableUpdateSize
+
schemaSize
+
pTable
->
sqlLen
;
}
pOper
->
rowSize
=
len
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtChildTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SChildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChildTableObj
));
if
(
pTable
==
NULL
)
{
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
if
(
pTable
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
memcpy
(
pTable
,
pOper
->
rowData
,
tsChildTableUpdateSize
);
int32_t
len
=
strlen
(
pOper
->
rowData
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
return
TSDB_CODE_INVALID_TABLE_ID
;
pTable
->
info
.
tableId
=
strdup
(
pOper
->
rowData
);
len
++
;
memcpy
((
char
*
)
pTable
+
sizeof
(
char
*
),
pOper
->
rowData
+
len
,
tsChildTableUpdateSize
);
len
+=
tsChildTableUpdateSize
;
if
(
pTable
->
info
.
type
!=
TSDB_CHILD_TABLE
)
{
int32_t
schemaSize
=
pTable
->
numOfColumns
*
sizeof
(
SSchema
);
pTable
->
schema
=
(
SSchema
*
)
malloc
(
schemaSize
);
if
(
pTable
->
schema
==
NULL
)
{
mgmtDestroyChildTable
(
pTable
);
return
TSDB_CODE_
SERV_OUT_OF_MEMORY
;
return
TSDB_CODE_
INVALID_TABLE_TYPE
;
}
memcpy
(
pTable
->
schema
,
pOper
->
rowData
+
tsChildTableUpdateSize
,
schemaSize
);
memcpy
(
pTable
->
schema
,
pOper
->
rowData
+
len
,
schemaSize
);
len
+=
schemaSize
;
pTable
->
sql
=
(
char
*
)
malloc
(
pTable
->
sqlLen
);
if
(
pTable
->
sql
==
NULL
)
{
mgmtDestroyChildTable
(
pTable
);
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
if
(
pTable
->
sqlLen
!=
0
)
{
pTable
->
sql
=
malloc
(
pTable
->
sqlLen
);
if
(
pTable
->
sql
==
NULL
)
{
mgmtDestroyChildTable
(
pTable
);
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
}
memcpy
(
pTable
->
sql
,
pOper
->
rowData
+
len
,
pTable
->
sqlLen
);
}
memcpy
(
pTable
->
sql
,
pOper
->
rowData
+
tsChildTableUpdateSize
+
schemaSize
,
pTable
->
sqlLen
);
}
pOper
->
pObj
=
pTable
;
...
...
@@ -311,15 +329,15 @@ static int32_t mgmtChildTableActionRestored() {
static
int32_t
mgmtInitChildTables
()
{
SChildTableObj
tObj
;
tsChildTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsChildTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_CTABLE
,
.
tableName
=
"ctables"
,
.
hashSessions
=
tsMaxTables
,
.
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
),
.
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_ID_LEN
+
TSDB_CQ_SQL_SIZE
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
keyType
=
SDB_KEY_
VAR_
STRING
,
.
insertFp
=
mgmtChildTableActionInsert
,
.
deleteFp
=
mgmtChildTableActionDelete
,
.
updateFp
=
mgmtChildTableActionUpdate
,
...
...
@@ -372,6 +390,7 @@ static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
taosHashCleanup
(
pStable
->
vgHash
);
pStable
->
vgHash
=
NULL
;
}
tfree
(
pStable
->
info
.
tableId
);
tfree
(
pStable
->
schema
);
tfree
(
pStable
);
}
...
...
@@ -408,11 +427,13 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
SSuperTableObj
*
pNew
=
pOper
->
pObj
;
SSuperTableObj
*
pTable
=
mgmtGetSuperTable
(
pNew
->
info
.
tableId
);
if
(
pTable
!=
pNew
)
{
void
*
oldTableId
=
pTable
->
info
.
tableId
;
void
*
oldSchema
=
pTable
->
schema
;
memcpy
(
pTable
,
pNew
,
pOper
->
rowSize
);
pTable
->
schema
=
pNew
->
schema
;
free
(
pNew
->
vgHash
);
free
(
pNew
);
free
(
oldTableId
);
free
(
oldSchema
);
}
mgmtDecTableRef
(
pTable
);
...
...
@@ -420,40 +441,50 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
}
static
int32_t
mgmtSuperTableActionEncode
(
SSdbOper
*
pOper
)
{
const
int32_t
maxRowSize
=
sizeof
(
SChildTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
);
SSuperTableObj
*
pStable
=
pOper
->
pObj
;
assert
(
pOper
->
pObj
!=
NULL
&&
pOper
->
rowData
!=
NULL
);
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfColumns
+
pStable
->
numOfTags
);
int32_t
len
=
strlen
(
pStable
->
info
.
tableId
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
len
=
TSDB_CODE_INVALID_TABLE_ID
;
if
(
maxRowSize
<
tsSuperTableUpdateSize
+
schemaSize
)
{
return
TSDB_CODE_INVALID_MSG_LEN
;
}
memcpy
(
pOper
->
rowData
,
pStable
->
info
.
tableId
,
len
);
memset
(
pOper
->
rowData
+
len
,
0
,
1
);
len
++
;
memcpy
(
pOper
->
rowData
+
len
,
(
char
*
)
pStable
+
sizeof
(
char
*
),
tsSuperTableUpdateSize
);
len
+=
tsSuperTableUpdateSize
;
memcpy
(
pOper
->
rowData
,
pStable
,
tsSuperTableUpdateSize
);
memcpy
(
pOper
->
rowData
+
tsSuperTableUpdateSize
,
pStable
->
schema
,
schemaSize
);
pOper
->
rowSize
=
tsSuperTableUpdateSize
+
schemaSize
;
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfColumns
+
pStable
->
numOfTags
);
memcpy
(
pOper
->
rowData
+
len
,
pStable
->
schema
,
schemaSize
);
len
+=
schemaSize
;
pOper
->
rowSize
=
len
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtSuperTableActionDecode
(
SSdbOper
*
pOper
)
{
assert
(
pOper
->
rowData
!=
NULL
);
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
1
,
sizeof
(
SSuperTableObj
));
if
(
pStable
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
memcpy
(
pStable
,
pOper
->
rowData
,
tsSuperTableUpdateSize
);
int32_t
len
=
strlen
(
pOper
->
rowData
);
if
(
len
>
TSDB_TABLE_ID_LEN
)
return
TSDB_CODE_INVALID_TABLE_ID
;
pStable
->
info
.
tableId
=
strdup
(
pOper
->
rowData
);
len
++
;
memcpy
((
char
*
)
pStable
+
sizeof
(
char
*
),
pOper
->
rowData
+
len
,
tsSuperTableUpdateSize
);
len
+=
tsSuperTableUpdateSize
;
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfColumns
+
pStable
->
numOfTags
);
pStable
->
schema
=
malloc
(
schemaSize
);
if
(
pStable
->
schema
==
NULL
)
{
mgmtDestroySuperTable
(
pStable
);
return
-
1
;
return
TSDB_CODE_NOT_SUPER_TABLE
;
}
memcpy
(
pStable
->
schema
,
pOper
->
rowData
+
tsSuperTableUpdateSize
,
schemaSize
);
memcpy
(
pStable
->
schema
,
pOper
->
rowData
+
len
,
schemaSize
);
pOper
->
pObj
=
pStable
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -465,15 +496,15 @@ static int32_t mgmtSuperTableActionRestored() {
static
int32_t
mgmtInitSuperTables
()
{
SSuperTableObj
tObj
;
tsSuperTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
tsSuperTableUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
.
info
.
type
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_STABLE
,
.
tableName
=
"stables"
,
.
hashSessions
=
TSDB_MAX_SUPER_TABLES
,
.
maxRowSize
=
tsSuperTableUpdateSize
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
,
.
maxRowSize
=
sizeof
(
SSuperTableObj
)
+
sizeof
(
SSchema
)
*
(
TSDB_MAX_TAGS
+
TSDB_MAX_COLUMNS
+
16
)
+
TSDB_TABLE_ID_LEN
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
keyType
=
SDB_KEY_
VAR_
STRING
,
.
insertFp
=
mgmtSuperTableActionInsert
,
.
deleteFp
=
mgmtSuperTableActionDelete
,
.
updateFp
=
mgmtSuperTableActionUpdate
,
...
...
@@ -720,14 +751,14 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
static
void
mgmtProcessCreateSuperTableMsg
(
SQueuedMsg
*
pMsg
)
{
SCMCreateTableMsg
*
pCreate
=
pMsg
->
pCont
;
SSuperTableObj
*
pStable
=
(
SSuperTableObj
*
)
calloc
(
1
,
sizeof
(
SSuperTableObj
));
SSuperTableObj
*
pStable
=
calloc
(
1
,
sizeof
(
SSuperTableObj
));
if
(
pStable
==
NULL
)
{
mError
(
"table:%s, failed to create, no enough memory"
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
strcpy
(
pStable
->
info
.
tableId
,
pCreate
->
tableId
);
pStable
->
info
.
tableId
=
strdup
(
pCreate
->
tableId
);
pStable
->
info
.
type
=
TSDB_SUPER_TABLE
;
pStable
->
createdTime
=
taosGetTimestampMs
();
pStable
->
uid
=
(((
uint64_t
)
pStable
->
createdTime
)
<<
16
)
+
(
sdbGetVersion
()
&
((
1ul
<<
16
)
-
1ul
));
...
...
@@ -1358,7 +1389,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
}
static
SChildTableObj
*
mgmtDoCreateChildTable
(
SCMCreateTableMsg
*
pCreate
,
SVgObj
*
pVgroup
,
int32_t
tid
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
calloc
(
1
,
sizeof
(
SChildTableObj
));
SChildTableObj
*
pTable
=
calloc
(
1
,
sizeof
(
SChildTableObj
));
if
(
pTable
==
NULL
)
{
mError
(
"table:%s, failed to alloc memory"
,
pCreate
->
tableId
);
terrno
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
...
...
@@ -1371,10 +1402,10 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
pTable
->
info
.
type
=
TSDB_NORMAL_TABLE
;
}
strcpy
(
pTable
->
info
.
tableId
,
pCreate
->
tableId
);
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
sid
=
tid
;
pTable
->
vgId
=
pVgroup
->
vgId
;
pTable
->
info
.
tableId
=
strdup
(
pCreate
->
tableId
);
pTable
->
createdTime
=
taosGetTimestampMs
();
pTable
->
sid
=
tid
;
pTable
->
vgId
=
pVgroup
->
vgId
;
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
char
*
pTagData
=
(
char
*
)
pCreate
->
schema
;
// it is a tag key
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
34eb31ce
...
...
@@ -80,7 +80,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
T_READ_MEMBER
(
ptr
,
int8_t
,
pTable
->
type
);
int
len
=
*
(
int
*
)
ptr
;
ptr
=
(
char
*
)
ptr
+
sizeof
(
int
);
pTable
->
name
=
calloc
(
1
,
len
+
VARSTR_HEADER_SIZE
);
pTable
->
name
=
calloc
(
1
,
len
+
VARSTR_HEADER_SIZE
+
1
);
if
(
pTable
->
name
==
NULL
)
return
NULL
;
varDataSetLen
(
pTable
->
name
,
len
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录