Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
56743e89
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
56743e89
编写于
6月 03, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/2.0tsdb' of
https://github.com/taosdata/TDengine
into feature/2.0tsdb
上级
e016a975
f81120e3
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
81 addition
and
40 deletion
+81
-40
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+1
-4
src/common/inc/tdataformat.h
src/common/inc/tdataformat.h
+11
-9
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+4
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-0
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+3
-1
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+1
-1
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+3
-1
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+48
-22
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+1
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+8
-1
未找到文件。
src/client/src/tscUtil.c
浏览文件 @
56743e89
...
...
@@ -651,6 +651,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
SDataRow
trow
=
(
SDataRow
)
pDataBlock
;
dataRowSetLen
(
trow
,
TD_DATA_ROW_HEAD_SIZE
+
flen
);
dataRowSetVersion
(
trow
,
pTableMeta
->
sversion
);
int
toffset
=
0
;
for
(
int32_t
j
=
0
;
j
<
tinfo
.
numOfColumns
;
j
++
)
{
...
...
@@ -759,10 +760,6 @@ void tscCloseTscObj(STscObj* pObj) {
taosTmrStopA
(
&
(
pObj
->
pTimer
));
tscFreeSqlObj
(
pSql
);
if
(
pSql
)
{
sem_destroy
(
&
pSql
->
rspSem
);
}
pthread_mutex_destroy
(
&
pObj
->
mutex
);
if
(
pObj
->
pDnodeConn
!=
NULL
)
{
...
...
src/common/inc/tdataformat.h
浏览文件 @
56743e89
...
...
@@ -119,22 +119,24 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// ----------------- Data row structure
/* A data row, the format is like below:
* |<------------------------------------- len ---------------------------------->|
* |<--
Head
->|<--------- flen -------------->| |
* +----------+---------------------------------+---------------------------------+
* | int
32_t
| | |
* +----------+---------------------------------+---------------------------------+
* | len | First part | Second part |
* +----------+---------------------------------+---------------------------------+
* |<--------------------
+----------
----------------- len ---------------------------------->|
* |<--
Head -
->|<--------- flen -------------->| |
* +----------
-----------
+---------------------------------+---------------------------------+
* | int
16_t | int16_t
| | |
* +----------+----------
+----------
-----------------------+---------------------------------+
* | len |
sversion |
First part | Second part |
* +----------+----------
+----------
-----------------------+---------------------------------+
*/
typedef
void
*
SDataRow
;
#define TD_DATA_ROW_HEAD_SIZE sizeof(int
32_t)
#define TD_DATA_ROW_HEAD_SIZE sizeof(int
16_t)*2
#define dataRowLen(r) (*(int32_t *)(r))
#define dataRowLen(r) (*(int16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetVersion(r, v) (dataRowVersion(r) = (v))
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
...
...
src/common/src/tdataformat.c
浏览文件 @
56743e89
...
...
@@ -159,7 +159,10 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
/**
* Initialize a data row
*/
void
tdInitDataRow
(
SDataRow
row
,
STSchema
*
pSchema
)
{
dataRowSetLen
(
row
,
TD_DATA_ROW_HEAD_SIZE
+
schemaFLen
(
pSchema
));
}
void
tdInitDataRow
(
SDataRow
row
,
STSchema
*
pSchema
)
{
dataRowSetLen
(
row
,
TD_DATA_ROW_HEAD_SIZE
+
schemaFLen
(
pSchema
));
dataRowSetVersion
(
row
,
schemaVersion
(
pSchema
));
}
SDataRow
tdNewDataRowFromSchema
(
STSchema
*
pSchema
)
{
int32_t
size
=
dataRowMaxBytesFromSchema
(
pSchema
);
...
...
src/inc/taosmsg.h
浏览文件 @
56743e89
...
...
@@ -602,6 +602,7 @@ typedef struct {
}
SMDVnodeDesc
;
typedef
struct
{
char
db
[
TSDB_DB_NAME_LEN
+
1
];
SMDVnodeCfg
cfg
;
SMDVnodeDesc
nodes
[
TSDB_MAX_REPLICA
];
}
SMDCreateVnodeMsg
;
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
56743e89
...
...
@@ -539,6 +539,8 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
SMDCreateVnodeMsg
*
pVnode
=
rpcMallocCont
(
sizeof
(
SMDCreateVnodeMsg
));
if
(
pVnode
==
NULL
)
return
NULL
;
strcpy
(
pVnode
->
db
,
pVgroup
->
dbName
);
SMDVnodeCfg
*
pCfg
=
&
pVnode
->
cfg
;
pCfg
->
vgId
=
htonl
(
pVgroup
->
vgId
);
pCfg
->
cfgVersion
=
htonl
(
pDb
->
cfgVersion
);
...
...
@@ -594,7 +596,7 @@ SRpcIpSet mnodeGetIpSetFromIp(char *ep) {
}
void
mnodeSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vgId:%d, send create vnode:%d msg, ahandle:%p
"
,
pVgroup
->
vgId
,
pVgroup
->
vgId
,
ahandl
e
);
mTrace
(
"vgId:%d, send create vnode:%d msg, ahandle:%p
db:%s"
,
pVgroup
->
vgId
,
pVgroup
->
vgId
,
ahandle
,
pVgroup
->
dbNam
e
);
SMDCreateVnodeMsg
*
pCreate
=
mnodeBuildCreateVnodeMsg
(
pVgroup
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
56743e89
...
...
@@ -211,7 +211,7 @@ static void* taosAcceptTcpConnection(void *arg) {
tTrace
(
"%s TCP server socket was shutdown, exiting..."
,
pServerObj
->
label
);
break
;
}
tError
(
"%s TCP accept failure(%s)"
,
pServerObj
->
label
,
errno
,
strerror
(
errno
));
tError
(
"%s TCP accept failure(%s)"
,
pServerObj
->
label
,
strerror
(
errno
));
continue
;
}
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
56743e89
...
...
@@ -69,11 +69,13 @@ typedef struct {
}
SMemTable
;
// ---------- TSDB TABLE DEFINITION
#define TSDB_MAX_TABLE_SCHEMAS 16
typedef
struct
STable
{
int8_t
type
;
STableId
tableId
;
uint64_t
superUid
;
// Super table UID
STSchema
*
schema
;
int16_t
numOfSchemas
;
STSchema
**
schema
;
STSchema
*
tagSchema
;
SKVRow
tagVal
;
SMemTable
*
mem
;
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
56743e89
...
...
@@ -37,18 +37,24 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
ptr
=
(
char
*
)
ptr
+
sizeof
(
int
);
memcpy
(
ptr
,
varDataVal
(
pTable
->
name
),
varDataLen
(
pTable
->
name
));
ptr
=
(
char
*
)
ptr
+
varDataLen
(
pTable
->
name
);
T_APPEND_MEMBER
(
ptr
,
&
(
pTable
->
tableId
),
STableId
,
uid
);
T_APPEND_MEMBER
(
ptr
,
&
(
pTable
->
tableId
),
STableId
,
tid
);
T_APPEND_MEMBER
(
ptr
,
pTable
,
STable
,
superUid
);
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
schema
);
T_APPEND_MEMBER
(
ptr
,
pTable
,
STable
,
numOfSchemas
);
for
(
int
i
=
0
;
i
<
pTable
->
numOfSchemas
;
i
++
)
{
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
schema
[
i
]);
}
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
tagSchema
);
}
else
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
ptr
=
tdEncodeKVRow
(
ptr
,
pTable
->
tagVal
);
}
else
{
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
schema
);
T_APPEND_MEMBER
(
ptr
,
pTable
,
STable
,
numOfSchemas
);
for
(
int
i
=
0
;
i
<
pTable
->
numOfSchemas
;
i
++
)
{
ptr
=
tdEncodeSchema
(
ptr
,
pTable
->
schema
[
i
]);
}
}
if
(
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
...
...
@@ -71,6 +77,11 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
STable
*
tsdbDecodeTable
(
void
*
cont
,
int
contLen
)
{
STable
*
pTable
=
(
STable
*
)
calloc
(
1
,
sizeof
(
STable
));
if
(
pTable
==
NULL
)
return
NULL
;
pTable
->
schema
=
(
STSchema
**
)
malloc
(
sizeof
(
STSchema
*
)
*
TSDB_MAX_TABLE_SCHEMAS
);
if
(
pTable
->
schema
==
NULL
)
{
free
(
pTable
);
return
NULL
;
}
void
*
ptr
=
cont
;
T_READ_MEMBER
(
ptr
,
int8_t
,
pTable
->
type
);
...
...
@@ -78,28 +89,34 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
ptr
=
(
char
*
)
ptr
+
sizeof
(
int
);
pTable
->
name
=
calloc
(
1
,
len
+
VARSTR_HEADER_SIZE
+
1
);
if
(
pTable
->
name
==
NULL
)
return
NULL
;
varDataSetLen
(
pTable
->
name
,
len
);
memcpy
(
pTable
->
name
->
data
,
ptr
,
len
);
ptr
=
(
char
*
)
ptr
+
len
;
T_READ_MEMBER
(
ptr
,
uint64_t
,
pTable
->
tableId
.
uid
);
T_READ_MEMBER
(
ptr
,
int32_t
,
pTable
->
tableId
.
tid
);
T_READ_MEMBER
(
ptr
,
uint64_t
,
pTable
->
superUid
);
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
pTable
->
schema
=
tdDecodeSchema
(
&
ptr
);
T_READ_MEMBER
(
ptr
,
int16_t
,
pTable
->
numOfSchemas
);
for
(
int
i
=
0
;
i
<
pTable
->
numOfSchemas
;
i
++
)
{
pTable
->
schema
[
i
]
=
tdDecodeSchema
(
&
ptr
);
}
pTable
->
tagSchema
=
tdDecodeSchema
(
&
ptr
);
}
else
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
ptr
=
tdDecodeKVRow
(
ptr
,
&
pTable
->
tagVal
);
}
else
{
pTable
->
schema
=
tdDecodeSchema
(
&
ptr
);
T_READ_MEMBER
(
ptr
,
int16_t
,
pTable
->
numOfSchemas
);
for
(
int
i
=
0
;
i
<
pTable
->
numOfSchemas
;
i
++
)
{
pTable
->
schema
[
i
]
=
tdDecodeSchema
(
&
ptr
);
}
}
if
(
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
ptr
=
taosDecodeString
(
ptr
,
&
(
pTable
->
sql
));
}
pTable
->
lastKey
=
TSKEY_INITIAL_VAL
;
return
pTable
;
}
...
...
@@ -221,13 +238,14 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
return
0
;
}
// Get the newest table schema
STSchema
*
tsdbGetTableSchema
(
STsdbMeta
*
pMeta
,
STable
*
pTable
)
{
if
(
pTable
->
type
==
TSDB_NORMAL_TABLE
||
pTable
->
type
==
TSDB_SUPER_TABLE
||
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
return
pTable
->
schema
;
return
pTable
->
schema
[
pTable
->
numOfSchemas
-
1
]
;
}
else
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
STable
*
pSuper
=
tsdbGetTableByUid
(
pMeta
,
pTable
->
superUid
);
if
(
pSuper
==
NULL
)
return
NULL
;
return
pSuper
->
schema
;
return
pSuper
->
schema
[
pSuper
->
numOfSchemas
-
1
]
;
}
else
{
return
NULL
;
}
...
...
@@ -287,13 +305,16 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
}
pTable
->
type
=
pCfg
->
type
;
pTable
->
numOfSchemas
=
0
;
if
(
isSuper
)
{
pTable
->
type
=
TSDB_SUPER_TABLE
;
pTable
->
tableId
.
uid
=
pCfg
->
superUid
;
pTable
->
tableId
.
tid
=
-
1
;
pTable
->
superUid
=
TSDB_INVALID_SUPER_TABLE_ID
;
pTable
->
schema
=
tdDupSchema
(
pCfg
->
schema
);
pTable
->
schema
=
(
STSchema
**
)
malloc
(
sizeof
(
STSchema
*
)
*
TSDB_MAX_TABLE_SCHEMAS
);
pTable
->
numOfSchemas
=
1
;
pTable
->
schema
[
0
]
=
tdDupSchema
(
pCfg
->
schema
);
pTable
->
tagSchema
=
tdDupSchema
(
pCfg
->
tagSchema
);
tsize
=
strnlen
(
pCfg
->
sname
,
TSDB_TABLE_NAME_LEN
);
...
...
@@ -328,14 +349,18 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
if
(
pCfg
->
type
==
TSDB_CHILD_TABLE
)
{
pTable
->
superUid
=
pCfg
->
superUid
;
pTable
->
tagVal
=
tdKVRowDup
(
pCfg
->
tagValues
);
}
else
if
(
pCfg
->
type
==
TSDB_NORMAL_TABLE
)
{
pTable
->
superUid
=
-
1
;
pTable
->
schema
=
tdDupSchema
(
pCfg
->
schema
);
}
else
{
ASSERT
(
pCfg
->
type
==
TSDB_STREAM_TABLE
);
pTable
->
superUid
=
-
1
;
pTable
->
schema
=
tdDupSchema
(
pCfg
->
schema
);
pTable
->
sql
=
strdup
(
pCfg
->
sql
);
pTable
->
schema
=
(
STSchema
**
)
malloc
(
sizeof
(
STSchema
*
)
*
TSDB_MAX_TABLE_SCHEMAS
);
pTable
->
numOfSchemas
=
1
;
pTable
->
schema
[
0
]
=
tdDupSchema
(
pCfg
->
schema
);
if
(
pCfg
->
type
==
TSDB_NORMAL_TABLE
)
{
pTable
->
superUid
=
-
1
;
}
else
{
ASSERT
(
pCfg
->
type
==
TSDB_STREAM_TABLE
);
pTable
->
superUid
=
-
1
;
pTable
->
sql
=
strdup
(
pCfg
->
sql
);
}
}
}
...
...
@@ -568,7 +593,7 @@ static int tsdbFreeTable(STable *pTable) {
if
(
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
kvRowFree
(
pTable
->
tagVal
);
}
else
{
tdFreeSchema
(
pTable
->
schema
);
for
(
int
i
=
0
;
i
<
pTable
->
numOfSchemas
;
i
++
)
tdFreeSchema
(
pTable
->
schema
[
i
]
);
}
if
(
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
...
...
@@ -630,9 +655,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
}
// Update the pMeta->maxCols and pMeta->maxRowBytes
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
||
pTable
->
type
==
TSDB_NORMAL_TABLE
)
{
if
(
schemaNCols
(
pTable
->
schema
)
>
pMeta
->
maxCols
)
pMeta
->
maxCols
=
schemaNCols
(
pTable
->
schema
);
int
bytes
=
dataRowMaxBytesFromSchema
(
pTable
->
schema
);
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
||
pTable
->
type
==
TSDB_NORMAL_TABLE
||
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
if
(
schemaNCols
(
pTable
->
schema
[
pTable
->
numOfSchemas
-
1
])
>
pMeta
->
maxCols
)
pMeta
->
maxCols
=
schemaNCols
(
pTable
->
schema
[
pTable
->
numOfSchemas
-
1
]);
int
bytes
=
dataRowMaxBytesFromSchema
(
pTable
->
schema
[
pTable
->
numOfSchemas
-
1
]);
if
(
bytes
>
pMeta
->
maxRowBytes
)
pMeta
->
maxRowBytes
=
bytes
;
}
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
56743e89
...
...
@@ -51,6 +51,7 @@ typedef struct {
SSyncCfg
syncCfg
;
SWalCfg
walCfg
;
char
*
rootDir
;
char
db
[
TSDB_DB_NAME_LEN
+
1
];
}
SVnodeObj
;
int
vnodeWriteToQueue
(
void
*
param
,
void
*
pHead
,
int
type
);
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
56743e89
...
...
@@ -498,7 +498,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
db
\"
:
\"
%s
\"
,
\n
"
,
pVnodeCfg
->
db
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
cfgVersion
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
cfgVersion
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
cacheBlockSize
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
cacheBlockSize
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
totalBlocks
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
totalBlocks
);
...
...
@@ -570,6 +570,13 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
goto
PARSE_OVER
;
}
cJSON
*
db
=
cJSON_GetObjectItem
(
root
,
"db"
);
if
(
!
db
||
db
->
type
!=
cJSON_String
||
db
->
valuestring
==
NULL
)
{
vError
(
"vgId:%d, failed to read vnode cfg, db not found"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
strcpy
(
pVnode
->
db
,
db
->
valuestring
);
cJSON
*
cfgVersion
=
cJSON_GetObjectItem
(
root
,
"cfgVersion"
);
if
(
!
cfgVersion
||
cfgVersion
->
type
!=
cJSON_Number
)
{
vError
(
"vgId:%d, failed to read vnode cfg, cfgVersion not found"
,
pVnode
->
vgId
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录