Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
KASAKI11
TDengine
提交
a3714edf
T
TDengine
项目概览
KASAKI11
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a3714edf
编写于
7月 15, 2020
作者:
P
Ping Xiao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into jdbcBatchInsert
上级
1ab0582b
381e7df4
变更
37
展开全部
隐藏空白更改
内联
并排
Showing
37 changed file
with
3009 addition
and
1864 deletion
+3009
-1864
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+8
-0
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+1
-0
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+5
-3
src/common/src/tglobal.c
src/common/src/tglobal.c
+11
-0
src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java
...jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java
+9
-1
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+1
-0
src/inc/taosdef.h
src/inc/taosdef.h
+4
-0
src/inc/taosmsg.h
src/inc/taosmsg.h
+2
-0
src/inc/ttokendef.h
src/inc/ttokendef.h
+111
-111
src/inc/twal.h
src/inc/twal.h
+1
-0
src/kit/taosdump/taosdump.c
src/kit/taosdump/taosdump.c
+110
-13
src/kit/taosmigrate/taosmigrateVnodeCfg.c
src/kit/taosmigrate/taosmigrateVnodeCfg.c
+8
-0
src/mnode/inc/mnodeDef.h
src/mnode/inc/mnodeDef.h
+1
-0
src/mnode/src/mnodeDb.c
src/mnode/src/mnodeDb.c
+24
-5
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+5
-3
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+1
-0
src/query/inc/qsqlparser.h
src/query/inc/qsqlparser.h
+1
-0
src/query/inc/sql.y
src/query/inc/sql.y
+3
-0
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+61
-8
src/query/src/qparserImpl.c
src/query/src/qparserImpl.c
+1
-0
src/query/src/qtokenizer.c
src/query/src/qtokenizer.c
+1
-0
src/query/src/sql.c
src/query/src/sql.c
+1137
-1118
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+55
-21
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+75
-88
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+2
-2
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+479
-343
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+20
-11
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+10
-1
src/wal/src/walMain.c
src/wal/src/walMain.c
+73
-14
tests/pytest/crash_gen.py
tests/pytest/crash_gen.py
+149
-100
tests/pytest/util/dnodes-no-random-fail.py
tests/pytest/util/dnodes-no-random-fail.py
+500
-0
tests/pytest/util/dnodes-random-fail.py
tests/pytest/util/dnodes-random-fail.py
+2
-2
tests/script/sh/deploy.sh
tests/script/sh/deploy.sh
+15
-15
tests/script/sh/exec-no-random-fail.sh
tests/script/sh/exec-no-random-fail.sh
+113
-0
tests/script/sh/exec-random-fail.sh
tests/script/sh/exec-random-fail.sh
+1
-1
tests/script/tmp/mnodes.sim
tests/script/tmp/mnodes.sim
+7
-3
tests/test/c/createTablePerformance.c
tests/test/c/createTablePerformance.c
+2
-1
未找到文件。
src/client/src/tscSQLParser.c
浏览文件 @
a3714edf
...
...
@@ -4962,6 +4962,7 @@ static void setCreateDBOption(SCMCreateDbMsg* pMsg, SCreateDBInfo* pCreateDb) {
pMsg
->
commitTime
=
htonl
(
pCreateDb
->
commitTime
);
pMsg
->
minRowsPerFileBlock
=
htonl
(
pCreateDb
->
minRowsPerBlock
);
pMsg
->
maxRowsPerFileBlock
=
htonl
(
pCreateDb
->
maxRowsPerBlock
);
pMsg
->
fsyncPeriod
=
htonl
(
pCreateDb
->
fsyncPeriod
);
pMsg
->
compression
=
pCreateDb
->
compressionLevel
;
pMsg
->
walLevel
=
(
char
)
pCreateDb
->
walLevel
;
pMsg
->
replications
=
pCreateDb
->
replica
;
...
...
@@ -5529,6 +5530,13 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate) {
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg
);
}
val
=
htonl
(
pCreate
->
fsyncPeriod
);
if
(
val
!=
-
1
&&
(
val
<
TSDB_MIN_FSYNC_PERIOD
||
val
>
TSDB_MAX_FSYNC_PERIOD
))
{
snprintf
(
msg
,
tListLen
(
msg
),
"invalid db option fsyncPeriod: %d valid range: [%d, %d]"
,
val
,
TSDB_MIN_FSYNC_PERIOD
,
TSDB_MAX_FSYNC_PERIOD
);
return
invalidSqlErrMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg
);
}
if
(
pCreate
->
compression
!=
-
1
&&
(
pCreate
->
compression
<
TSDB_MIN_COMP_LEVEL
||
pCreate
->
compression
>
TSDB_MAX_COMP_LEVEL
))
{
snprintf
(
msg
,
tListLen
(
msg
),
"invalid db option compression: %d valid range: [%d, %d]"
,
pCreate
->
compression
,
...
...
src/common/inc/tglobal.h
浏览文件 @
a3714edf
...
...
@@ -80,6 +80,7 @@ extern int16_t tsCommitTime; // seconds
extern
int32_t
tsTimePrecision
;
extern
int16_t
tsCompression
;
extern
int16_t
tsWAL
;
extern
int32_t
tsFsyncPeriod
;
extern
int32_t
tsReplications
;
// balance
...
...
src/common/src/tdataformat.c
浏览文件 @
a3714edf
...
...
@@ -384,9 +384,11 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
}
void
tdResetDataCols
(
SDataCols
*
pCols
)
{
pCols
->
numOfRows
=
0
;
for
(
int
i
=
0
;
i
<
pCols
->
maxCols
;
i
++
)
{
dataColReset
(
pCols
->
cols
+
i
);
if
(
pCols
!=
NULL
)
{
pCols
->
numOfRows
=
0
;
for
(
int
i
=
0
;
i
<
pCols
->
maxCols
;
i
++
)
{
dataColReset
(
pCols
->
cols
+
i
);
}
}
}
...
...
src/common/src/tglobal.c
浏览文件 @
a3714edf
...
...
@@ -110,6 +110,7 @@ int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
int32_t
tsTimePrecision
=
TSDB_DEFAULT_PRECISION
;
int16_t
tsCompression
=
TSDB_DEFAULT_COMP_LEVEL
;
int16_t
tsWAL
=
TSDB_DEFAULT_WAL_LEVEL
;
int32_t
tsFsyncPeriod
=
TSDB_DEFAULT_FSYNC_PERIOD
;
int32_t
tsReplications
=
TSDB_DEFAULT_DB_REPLICA_OPTION
;
int32_t
tsMaxVgroupsPerDb
=
0
;
int32_t
tsMinTablePerVnode
=
100
;
...
...
@@ -715,6 +716,16 @@ static void doInitGlobalConfig() {
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"fsync"
;
cfg
.
ptr
=
&
tsFsyncPeriod
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
TSDB_MIN_FSYNC_PERIOD
;
cfg
.
maxValue
=
TSDB_MAX_FSYNC_PERIOD
;
cfg
.
ptrLength
=
0
;
cfg
.
unitType
=
TAOS_CFG_UTYPE_NONE
;
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"replica"
;
cfg
.
ptr
=
&
tsReplications
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
...
...
src/connector/jdbc/src/test/java/com/taosdata/jdbc/BatchInsertTest.java
浏览文件 @
a3714edf
...
...
@@ -62,7 +62,7 @@ public class BatchInsertTest extends BaseTest {
@Test
public
void
testBatchInsert
()
throws
SQLException
{
ExecutorService
executorService
=
Executors
.
newFixedThreadPool
(
numOfTables
);
ExecutorService
executorService
=
Executors
.
newFixedThreadPool
(
numOfTables
);
for
(
int
i
=
0
;
i
<
numOfTables
;
i
++)
{
final
int
index
=
i
;
...
...
@@ -94,6 +94,14 @@ public class BatchInsertTest extends BaseTest {
});
}
executorService
.
shutdown
();
try
{
executorService
.
awaitTermination
(
Long
.
MAX_VALUE
,
TimeUnit
.
NANOSECONDS
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
Statement
statement
=
connection
.
createStatement
();
ResultSet
rs
=
statement
.
executeQuery
(
"select * from meters"
);
int
num
=
0
;
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
a3714edf
...
...
@@ -401,6 +401,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate
->
cfg
.
daysToKeep
=
htonl
(
pCreate
->
cfg
.
daysToKeep
);
pCreate
->
cfg
.
minRowsPerFileBlock
=
htonl
(
pCreate
->
cfg
.
minRowsPerFileBlock
);
pCreate
->
cfg
.
maxRowsPerFileBlock
=
htonl
(
pCreate
->
cfg
.
maxRowsPerFileBlock
);
pCreate
->
cfg
.
fsyncPeriod
=
htonl
(
pCreate
->
cfg
.
fsyncPeriod
);
pCreate
->
cfg
.
commitTime
=
htonl
(
pCreate
->
cfg
.
commitTime
);
for
(
int32_t
j
=
0
;
j
<
pCreate
->
cfg
.
replications
;
++
j
)
{
...
...
src/inc/taosdef.h
浏览文件 @
a3714edf
...
...
@@ -332,6 +332,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_WAL_LEVEL 2
#define TSDB_DEFAULT_WAL_LEVEL 1
#define TSDB_MIN_FSYNC_PERIOD 0
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
#define TSDB_MIN_DB_REPLICA_OPTION 1
#define TSDB_MAX_DB_REPLICA_OPTION 3
#define TSDB_DEFAULT_DB_REPLICA_OPTION 1
...
...
src/inc/taosmsg.h
浏览文件 @
a3714edf
...
...
@@ -515,6 +515,7 @@ typedef struct {
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
uint8_t
precision
;
// time resolution
int8_t
compression
;
int8_t
walLevel
;
...
...
@@ -608,6 +609,7 @@ typedef struct {
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int8_t
precision
;
int8_t
compression
;
int8_t
walLevel
;
...
...
src/inc/ttokendef.h
浏览文件 @
a3714edf
...
...
@@ -110,117 +110,117 @@
#define TK_BLOCKS 92
#define TK_CTIME 93
#define TK_WAL 94
#define TK_
COMP
95
#define TK_
PRECISION
96
#define TK_
LP
97
#define TK_
R
P 98
#define TK_
TAGS
99
#define TK_
USING
100
#define TK_
AS
101
#define TK_
COMMA
102
#define TK_
NULL
103
#define TK_
SELECT
104
#define TK_
UNION
105
#define TK_
ALL
106
#define TK_
FROM
107
#define TK_
VARIABLE
108
#define TK_
INTERVAL
109
#define TK_
FILL
110
#define TK_
SLIDING
111
#define TK_
ORDER
112
#define TK_
BY
113
#define TK_
ASC
114
#define TK_
DESC
115
#define TK_
GROUP
116
#define TK_
HAVING
117
#define TK_
LIMIT
118
#define TK_
OFFSET
119
#define TK_
SLIMI
T 120
#define TK_S
OFFSET
121
#define TK_
WHERE
122
#define TK_
NOW
123
#define TK_
RESET
124
#define TK_
QUERY
125
#define TK_
ADD
126
#define TK_
COLUMN
127
#define TK_
TAG
128
#define TK_
CHANGE
129
#define TK_
SET
130
#define TK_
KILL
131
#define TK_
CONNECTION
132
#define TK_
STREAM
133
#define TK_
COLON
134
#define TK_
ABORT
135
#define TK_A
FTER
136
#define TK_A
TTACH
137
#define TK_
BEFORE
138
#define TK_BE
GIN
139
#define TK_
CASCADE
140
#define TK_C
LUSTER
141
#define TK_C
ONFLICT
142
#define TK_CO
PY
143
#define TK_
DEFERRED
144
#define TK_DE
LIMITERS
145
#define TK_DE
TACH
146
#define TK_
EACH
147
#define TK_E
ND
148
#define TK_E
XPLAIN
149
#define TK_
FAIL
150
#define TK_F
OR
151
#define TK_
IGNORE
152
#define TK_I
MMEDIATE
153
#define TK_I
NITIALLY
154
#define TK_IN
STEAD
155
#define TK_
MATCH
156
#define TK_
KEY
157
#define TK_
OF
158
#define TK_
RAISE
159
#define TK_R
EPLACE
160
#define TK_RE
STRICT
161
#define TK_R
OW
162
#define TK_
STATEMENT
163
#define TK_
TRIGGER
164
#define TK_
VIEW
165
#define TK_
COUNT
166
#define TK_
SUM
167
#define TK_
AVG
168
#define TK_
MIN
169
#define TK_M
AX
170
#define TK_
FIRST
171
#define TK_
LAST
172
#define TK_
TOP
173
#define TK_
BOTTOM
174
#define TK_
STDDEV
175
#define TK_
PERCENTILE
176
#define TK_
APERCENTILE
177
#define TK_
LEASTSQUARES
178
#define TK_
HISTOGRAM
179
#define TK_
DIFF
180
#define TK_
SPREAD
181
#define TK_
TWA
182
#define TK_
INTERP
183
#define TK_
LAST_ROW
184
#define TK_
RATE
185
#define TK_
IRATE
186
#define TK_
SUM_RATE
187
#define TK_SUM_
IRATE
188
#define TK_
AVG_RATE
189
#define TK_AVG_
IRATE
190
#define TK_
TBID
191
#define TK_
SEMI
192
#define TK_
NONE
193
#define TK_
PREV
194
#define TK_
LINEAR
195
#define TK_
IMPORT
196
#define TK_
METRIC
197
#define TK_
TBNAME
198
#define TK_
JOIN
199
#define TK_
METRICS
200
#define TK_
STABLE
201
#define TK_
INSERT
202
#define TK_IN
TO
203
#define TK_
VALUES
204
#define TK_
FSYNC
95
#define TK_
COMP
96
#define TK_
PRECISION
97
#define TK_
L
P 98
#define TK_
RP
99
#define TK_
TAGS
100
#define TK_
USING
101
#define TK_
AS
102
#define TK_
COMMA
103
#define TK_
NULL
104
#define TK_
SELECT
105
#define TK_
UNION
106
#define TK_
ALL
107
#define TK_
FROM
108
#define TK_
VARIABLE
109
#define TK_
INTERVAL
110
#define TK_
FILL
111
#define TK_
SLIDING
112
#define TK_
ORDER
113
#define TK_
BY
114
#define TK_
ASC
115
#define TK_
DESC
116
#define TK_
GROUP
117
#define TK_
HAVING
118
#define TK_
LIMIT
119
#define TK_
OFFSE
T 120
#define TK_S
LIMIT
121
#define TK_
SOFFSET
122
#define TK_
WHERE
123
#define TK_
NOW
124
#define TK_
RESET
125
#define TK_
QUERY
126
#define TK_
ADD
127
#define TK_
COLUMN
128
#define TK_
TAG
129
#define TK_
CHANGE
130
#define TK_
SET
131
#define TK_
KILL
132
#define TK_
CONNECTION
133
#define TK_
STREAM
134
#define TK_
COLON
135
#define TK_A
BORT
136
#define TK_A
FTER
137
#define TK_
ATTACH
138
#define TK_BE
FORE
139
#define TK_
BEGIN
140
#define TK_C
ASCADE
141
#define TK_C
LUSTER
142
#define TK_CO
NFLICT
143
#define TK_
COPY
144
#define TK_DE
FERRED
145
#define TK_DE
LIMITERS
146
#define TK_
DETACH
147
#define TK_E
ACH
148
#define TK_E
ND
149
#define TK_
EXPLAIN
150
#define TK_F
AIL
151
#define TK_
FOR
152
#define TK_I
GNORE
153
#define TK_I
MMEDIATE
154
#define TK_IN
ITIALLY
155
#define TK_
INSTEAD
156
#define TK_
MATCH
157
#define TK_
KEY
158
#define TK_
OF
159
#define TK_R
AISE
160
#define TK_RE
PLACE
161
#define TK_R
ESTRICT
162
#define TK_
ROW
163
#define TK_
STATEMENT
164
#define TK_
TRIGGER
165
#define TK_
VIEW
166
#define TK_
COUNT
167
#define TK_
SUM
168
#define TK_
AVG
169
#define TK_M
IN
170
#define TK_
MAX
171
#define TK_
FIRST
172
#define TK_
LAST
173
#define TK_
TOP
174
#define TK_
BOTTOM
175
#define TK_
STDDEV
176
#define TK_
PERCENTILE
177
#define TK_
APERCENTILE
178
#define TK_
LEASTSQUARES
179
#define TK_
HISTOGRAM
180
#define TK_
DIFF
181
#define TK_
SPREAD
182
#define TK_
TWA
183
#define TK_
INTERP
184
#define TK_
LAST_ROW
185
#define TK_
RATE
186
#define TK_
IRATE
187
#define TK_SUM_
RATE
188
#define TK_
SUM_IRATE
189
#define TK_AVG_
RATE
190
#define TK_
AVG_IRATE
191
#define TK_
TBID
192
#define TK_
SEMI
193
#define TK_
NONE
194
#define TK_
PREV
195
#define TK_
LINEAR
196
#define TK_
IMPORT
197
#define TK_
METRIC
198
#define TK_
TBNAME
199
#define TK_
JOIN
200
#define TK_
METRICS
201
#define TK_
STABLE
202
#define TK_IN
SERT
203
#define TK_
INTO
204
#define TK_VALUES 205
#define TK_SPACE 300
#define TK_COMMENT 301
...
...
src/inc/twal.h
浏览文件 @
a3714edf
...
...
@@ -35,6 +35,7 @@ typedef struct {
typedef
struct
{
int8_t
walLevel
;
// wal level
int32_t
fsyncPeriod
;
// millisecond
int8_t
wals
;
// number of WAL files;
int8_t
keep
;
// keep the wal file when closed
}
SWalCfg
;
...
...
src/kit/taosdump/taosdump.c
浏览文件 @
a3714edf
...
...
@@ -179,8 +179,8 @@ static struct argp_option options[] = {
{
"start-time"
,
'S'
,
"START_TIME"
,
0
,
"Start time to dump."
,
3
},
{
"end-time"
,
'E'
,
"END_TIME"
,
0
,
"End time to dump."
,
3
},
{
"data-batch"
,
'N'
,
"DATA_BATCH"
,
0
,
"Number of data point per insert statement. Default is 1."
,
3
},
{
"table-batch"
,
'
T
'
,
"TABLE_BATCH"
,
0
,
"Number of table dumpout into one output file. Default is 1."
,
3
},
{
"thread_num"
,
'
t
'
,
"THREAD_NUM"
,
0
,
"Number of thread for dump in file. Default is 5."
,
3
},
{
"table-batch"
,
'
t
'
,
"TABLE_BATCH"
,
0
,
"Number of table dumpout into one output file. Default is 1."
,
3
},
{
"thread_num"
,
'
T
'
,
"THREAD_NUM"
,
0
,
"Number of thread for dump in file. Default is 5."
,
3
},
{
"allow-sys"
,
'a'
,
0
,
0
,
"Allow to dump sys database"
,
3
},
{
0
}};
...
...
@@ -304,10 +304,10 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
case
'N'
:
arguments
->
data_batch
=
atoi
(
arg
);
break
;
case
'
T
'
:
case
'
t
'
:
arguments
->
table_batch
=
atoi
(
arg
);
break
;
case
'
t
'
:
case
'
T
'
:
arguments
->
thread_num
=
atoi
(
arg
);
break
;
case
OPT_ABORT
:
...
...
@@ -406,7 +406,7 @@ int main(int argc, char *argv[]) {
printf
(
"password: %s
\n
"
,
tsArguments
.
password
);
printf
(
"port: %u
\n
"
,
tsArguments
.
port
);
printf
(
"cversion: %s
\n
"
,
tsArguments
.
cversion
);
printf
(
"mysqlFlag: %d"
,
tsArguments
.
mysqlFlag
);
printf
(
"mysqlFlag: %d
\n
"
,
tsArguments
.
mysqlFlag
);
printf
(
"outpath: %s
\n
"
,
tsArguments
.
outpath
);
printf
(
"inpath: %s
\n
"
,
tsArguments
.
inpath
);
printf
(
"encode: %s
\n
"
,
tsArguments
.
encode
);
...
...
@@ -821,7 +821,7 @@ _exit_failure:
return
-
1
;
}
int
taosGetTableDes
(
char
*
table
,
STableDef
*
tableDes
,
TAOS
*
taosCon
)
{
int
taosGetTableDes
(
char
*
table
,
STableDef
*
tableDes
,
TAOS
*
taosCon
,
bool
isSuperTable
)
{
TAOS_ROW
row
=
NULL
;
TAOS_RES
*
tmpResult
=
NULL
;
int
count
=
0
;
...
...
@@ -832,6 +832,13 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
return
-
1
;
}
char
*
tbuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
if
(
tbuf
==
NULL
)
{
fprintf
(
stderr
,
"failed to allocate memory
\n
"
);
free
(
tempCommand
);
return
-
1
;
}
sprintf
(
tempCommand
,
"describe %s"
,
table
);
tmpResult
=
taos_query
(
taosCon
,
tempCommand
);
...
...
@@ -862,6 +869,92 @@ int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) {
taos_free_result
(
tmpResult
);
tmpResult
=
NULL
;
if
(
isSuperTable
)
{
free
(
tempCommand
);
return
count
;
}
// if chidl-table have tag, using select tagName from table to get tagValue
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
if
(
strcmp
(
tableDes
->
cols
[
i
].
note
,
"TAG"
)
!=
0
)
continue
;
sprintf
(
tempCommand
,
"select %s from %s"
,
tableDes
->
cols
[
i
].
field
,
table
);
tmpResult
=
taos_query
(
taosCon
,
tempCommand
);
code
=
taos_errno
(
tmpResult
);
if
(
code
!=
0
)
{
fprintf
(
stderr
,
"failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
fields
=
taos_fetch_fields
(
tmpResult
);
row
=
taos_fetch_row
(
tmpResult
);
if
(
NULL
==
row
)
{
fprintf
(
stderr
,
" fetch failed to run command %s
\n
"
,
tempCommand
);
free
(
tempCommand
);
taos_free_result
(
tmpResult
);
return
-
1
;
}
switch
(
fields
[
0
].
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%d"
,
((((
int
)(
*
((
char
*
)
row
[
0
])))
==
1
)
?
1
:
0
));
break
;
case
TSDB_DATA_TYPE_TINYINT
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%d"
,
(
int
)(
*
((
char
*
)
row
[
0
])));
break
;
case
TSDB_DATA_TYPE_SMALLINT
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%d"
,
(
int
)(
*
((
short
*
)
row
[
0
])));
break
;
case
TSDB_DATA_TYPE_INT
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%d"
,
*
((
int
*
)
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_BIGINT
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%"
PRId64
""
,
*
((
int64_t
*
)
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_FLOAT
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%f"
,
GET_FLOAT_VAL
(
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%f"
,
GET_DOUBLE_VAL
(
row
[
0
]));
break
;
case
TSDB_DATA_TYPE_BINARY
:
tableDes
->
cols
[
i
].
note
[
0
]
=
'\''
;
converStringToReadable
((
char
*
)
row
[
0
],
fields
[
0
].
bytes
,
tbuf
,
COMMAND_SIZE
);
char
*
pstr
=
stpcpy
(
&
(
tableDes
->
cols
[
i
].
note
[
1
]),
tbuf
);
*
(
pstr
++
)
=
'\''
;
break
;
case
TSDB_DATA_TYPE_NCHAR
:
convertNCharToReadable
((
char
*
)
row
[
0
],
fields
[
0
].
bytes
,
tbuf
,
COMMAND_SIZE
);
sprintf
(
tableDes
->
cols
[
i
].
note
,
"
\'
%s
\'
"
,
tbuf
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
sprintf
(
tableDes
->
cols
[
i
].
note
,
"%"
PRId64
""
,
*
(
int64_t
*
)
row
[
0
]);
#if 0
if (!arguments->mysqlFlag) {
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[0]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
#endif
break
;
default:
break
;
}
taos_free_result
(
tmpResult
);
tmpResult
=
NULL
;
}
free
(
tempCommand
);
return
count
;
...
...
@@ -886,23 +979,25 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
,
false
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
// create child-table using super-table
taosDumpCreateMTableClause
(
tableDes
,
metric
,
count
,
fp
);
}
else
{
// dump table definition
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
,
false
);
if
(
count
<
0
)
{
free
(
tableDes
);
return
-
1
;
}
// create normal-table or super-table
taosDumpCreateTableClause
(
tableDes
,
count
,
fp
);
}
...
...
@@ -1033,7 +1128,7 @@ int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) {
exit
(
-
1
);
}
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
);
count
=
taosGetTableDes
(
table
,
tableDes
,
taosCon
,
true
);
if
(
count
<
0
)
{
free
(
tableDes
);
...
...
@@ -1083,7 +1178,6 @@ int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
taos_free_result
(
tmpResult
);
exit
(
-
1
);
}
taos_free_result
(
tmpResult
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
...
...
@@ -1291,14 +1385,16 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
if
(
counter
!=
count_temp
)
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
",
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
pstr
+=
sprintf
(
pstr
,
", %s"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
", %s"
,
tableDes
->
cols
[
counter
].
note
);
}
}
else
{
if
(
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"binary"
)
==
0
||
strcasecmp
(
tableDes
->
cols
[
counter
].
type
,
"nchar"
)
==
0
)
{
pstr
+=
sprintf
(
pstr
,
"
\'
%s
\'
"
,
tableDes
->
cols
[
counter
].
note
);
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
pstr
+=
sprintf
(
pstr
,
"%s"
,
tableDes
->
cols
[
counter
].
note
);
}
else
{
pstr
+=
sprintf
(
pstr
,
"%s"
,
tableDes
->
cols
[
counter
].
note
);
}
...
...
@@ -1363,7 +1459,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS*
return
-
1
;
}
numFields
=
taos_field_count
(
t
aosCon
);
numFields
=
taos_field_count
(
t
mpResult
);
assert
(
numFields
>
0
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
tmpResult
);
tbuf
=
(
char
*
)
malloc
(
COMMAND_SIZE
);
...
...
@@ -2015,6 +2111,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c
}
memcpy
(
cmd
+
cmd_len
,
line
,
read_len
);
cmd
[
read_len
+
cmd_len
]
=
'\0'
;
if
(
queryDB
(
taos
,
cmd
))
{
fprintf
(
stderr
,
"error sql: linenu:%d, file:%s
\n
"
,
lineNo
,
fileName
);
}
...
...
src/kit/taosmigrate/taosmigrateVnodeCfg.c
浏览文件 @
a3714edf
...
...
@@ -48,6 +48,7 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
precision
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
precision
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
compression
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
compression
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
walLevel
\"
: %d,
\n
"
,
pVnode
->
walCfg
.
walLevel
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fsync
\"
: %d,
\n
"
,
pVnode
->
walCfg
.
fsyncPeriod
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
replica
\"
: %d,
\n
"
,
pVnode
->
syncCfg
.
replica
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
wals
\"
: %d,
\n
"
,
pVnode
->
walCfg
.
wals
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
quorum
\"
: %d,
\n
"
,
pVnode
->
syncCfg
.
quorum
);
...
...
@@ -212,6 +213,13 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
}
pVnode
->
walCfg
.
walLevel
=
(
int8_t
)
walLevel
->
valueint
;
cJSON
*
fsyncPeriod
=
cJSON_GetObjectItem
(
root
,
"fsync"
);
if
(
!
fsyncPeriod
||
fsyncPeriod
->
type
!=
cJSON_Number
)
{
printf
(
"vgId:%d, failed to read vnode cfg, fsyncPeriod not found
\n
"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
pVnode
->
walCfg
.
fsyncPeriod
=
fsyncPeriod
->
valueint
;
cJSON
*
wals
=
cJSON_GetObjectItem
(
root
,
"wals"
);
if
(
!
wals
||
wals
->
type
!=
cJSON_Number
)
{
printf
(
"vgId:%d, failed to read vnode cfg, wals not found
\n
"
,
pVnode
->
vgId
);
...
...
src/mnode/inc/mnodeDef.h
浏览文件 @
a3714edf
...
...
@@ -160,6 +160,7 @@ typedef struct {
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int8_t
precision
;
int8_t
compression
;
int8_t
walLevel
;
...
...
src/mnode/src/mnodeDb.c
浏览文件 @
a3714edf
...
...
@@ -287,14 +287,14 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
if
(
pCfg
->
replications
<
TSDB_MIN_DB_REPLICA_OPTION
||
pCfg
->
replications
>
TSDB_MAX_DB_REPLICA_OPTION
)
{
mError
(
"invalid db option replications:%d valid range: [%d, %d]"
,
pCfg
->
replications
,
TSDB_MIN_DB_REPLICA_OPTION
,
TSDB_MAX_DB_REPLICA_OPTION
);
if
(
pCfg
->
fsyncPeriod
<
TSDB_MIN_FSYNC_PERIOD
||
pCfg
->
fsyncPeriod
>
TSDB_MAX_FSYNC_PERIOD
)
{
mError
(
"invalid db option fsyncPeriod:%d, valid range: [%d, %d]"
,
pCfg
->
fsyncPeriod
,
TSDB_MIN_FSYNC_PERIOD
,
TSDB_MAX_FSYNC_PERIOD
);
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
if
(
pCfg
->
walLevel
<
TSDB_MIN_WAL_LEVEL
)
{
mError
(
"invalid db option walLevel:%d must be greater than 0"
,
pCfg
->
walLevel
);
if
(
pCfg
->
replications
<
TSDB_MIN_DB_REPLICA_OPTION
||
pCfg
->
replications
>
TSDB_MAX_DB_REPLICA_OPTION
)
{
mError
(
"invalid db option replications:%d valid range: [%d, %d]"
,
pCfg
->
replications
,
TSDB_MIN_DB_REPLICA_OPTION
,
TSDB_MAX_DB_REPLICA_OPTION
);
return
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
...
...
@@ -318,6 +318,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
daysToKeep2
<
0
)
pCfg
->
daysToKeep2
=
pCfg
->
daysToKeep
;
if
(
pCfg
->
minRowsPerFileBlock
<
0
)
pCfg
->
minRowsPerFileBlock
=
tsMinRowsInFileBlock
;
if
(
pCfg
->
maxRowsPerFileBlock
<
0
)
pCfg
->
maxRowsPerFileBlock
=
tsMaxRowsInFileBlock
;
if
(
pCfg
->
fsyncPeriod
<
0
)
pCfg
->
fsyncPeriod
=
tsFsyncPeriod
;
if
(
pCfg
->
commitTime
<
0
)
pCfg
->
commitTime
=
tsCommitTime
;
if
(
pCfg
->
precision
<
0
)
pCfg
->
precision
=
tsTimePrecision
;
if
(
pCfg
->
compression
<
0
)
pCfg
->
compression
=
tsCompression
;
...
...
@@ -367,6 +368,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
.
daysToKeep2
=
pCreate
->
daysToKeep2
,
.
minRowsPerFileBlock
=
pCreate
->
minRowsPerFileBlock
,
.
maxRowsPerFileBlock
=
pCreate
->
maxRowsPerFileBlock
,
.
fsyncPeriod
=
pCreate
->
fsyncPeriod
,
.
commitTime
=
pCreate
->
commitTime
,
.
precision
=
pCreate
->
precision
,
.
compression
=
pCreate
->
compression
,
...
...
@@ -559,6 +561,12 @@ static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"fsync"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
1
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TINYINT
;
strcpy
(
pSchema
[
cols
].
name
,
"comp"
);
...
...
@@ -682,6 +690,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
*
(
int8_t
*
)
pWrite
=
pDb
->
cfg
.
walLevel
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
pDb
->
cfg
.
fsyncPeriod
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int8_t
*
)
pWrite
=
pDb
->
cfg
.
compression
;
cols
++
;
...
...
@@ -758,6 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
pCreate
->
daysToKeep1
=
htonl
(
pCreate
->
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pCreate
->
daysToKeep2
);
pCreate
->
commitTime
=
htonl
(
pCreate
->
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pCreate
->
fsyncPeriod
);
pCreate
->
minRowsPerFileBlock
=
htonl
(
pCreate
->
minRowsPerFileBlock
);
pCreate
->
maxRowsPerFileBlock
=
htonl
(
pCreate
->
maxRowsPerFileBlock
);
...
...
@@ -785,6 +798,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
int32_t
minRows
=
htonl
(
pAlter
->
minRowsPerFileBlock
);
int32_t
maxRows
=
htonl
(
pAlter
->
maxRowsPerFileBlock
);
int32_t
commitTime
=
htonl
(
pAlter
->
commitTime
);
int32_t
fsyncPeriod
=
htonl
(
pAlter
->
fsyncPeriod
);
int8_t
compression
=
pAlter
->
compression
;
int8_t
walLevel
=
pAlter
->
walLevel
;
int8_t
replications
=
pAlter
->
replications
;
...
...
@@ -861,6 +875,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
terrno
=
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
if
(
fsyncPeriod
>=
0
&&
fsyncPeriod
!=
pDb
->
cfg
.
fsyncPeriod
)
{
mError
(
"db:%s, can't alter fsyncPeriod option"
,
pDb
->
name
);
terrno
=
TSDB_CODE_MND_INVALID_DB_OPTION
;
}
if
(
replications
>
0
&&
replications
!=
pDb
->
cfg
.
replications
)
{
mDebug
(
"db:%s, replications:%d change to %d"
,
pDb
->
name
,
pDb
->
cfg
.
replications
,
replications
);
newCfg
.
replications
=
replications
;
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
a3714edf
...
...
@@ -170,7 +170,7 @@ static void *sdbGetTableFromId(int32_t tableId) {
}
static
int32_t
sdbInitWal
()
{
SWalCfg
walCfg
=
{.
walLevel
=
2
,
.
wals
=
2
,
.
keep
=
1
};
SWalCfg
walCfg
=
{.
walLevel
=
2
,
.
wals
=
2
,
.
keep
=
1
,
.
fsyncPeriod
=
0
};
char
temp
[
TSDB_FILENAME_LEN
];
sprintf
(
temp
,
"%s/wal"
,
tsMnodeDir
);
tsSdbObj
.
wal
=
walOpen
(
temp
,
&
walCfg
);
...
...
@@ -252,13 +252,15 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
int32_t
processedCount
=
atomic_add_fetch_32
(
&
pOper
->
processedCount
,
1
);
if
(
processedCount
<=
1
)
{
if
(
pMsg
!=
NULL
)
{
sdbDebug
(
"app:%p:%p, waiting for confirm this operation, count:%d"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
processedCount
);
sdbDebug
(
"app:%p:%p, waiting for confirm this operation, count:%d result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
processedCount
,
tstrerror
(
code
));
}
return
;
}
if
(
pMsg
!=
NULL
)
{
sdbDebug
(
"app:%p:%p, is confirmed and will do callback func"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
);
sdbDebug
(
"app:%p:%p, is confirmed and will do callback func, result:%s"
,
pMsg
->
rpcMsg
.
ahandle
,
pMsg
,
tstrerror
(
code
));
}
if
(
pOper
->
cb
!=
NULL
)
{
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
a3714edf
...
...
@@ -757,6 +757,7 @@ SMDCreateVnodeMsg *mnodeBuildCreateVnodeMsg(SVgObj *pVgroup) {
pCfg
->
daysToKeep2
=
htonl
(
pDb
->
cfg
.
daysToKeep2
);
pCfg
->
minRowsPerFileBlock
=
htonl
(
pDb
->
cfg
.
minRowsPerFileBlock
);
pCfg
->
maxRowsPerFileBlock
=
htonl
(
pDb
->
cfg
.
maxRowsPerFileBlock
);
pCfg
->
fsyncPeriod
=
htonl
(
pDb
->
cfg
.
fsyncPeriod
);
pCfg
->
commitTime
=
htonl
(
pDb
->
cfg
.
commitTime
);
pCfg
->
precision
=
pDb
->
cfg
.
precision
;
pCfg
->
compression
=
pDb
->
cfg
.
compression
;
...
...
src/query/inc/qsqlparser.h
浏览文件 @
a3714edf
...
...
@@ -116,6 +116,7 @@ typedef struct SCreateDBInfo {
int32_t
daysPerFile
;
int32_t
minRowsPerBlock
;
int32_t
maxRowsPerBlock
;
int32_t
fsyncPeriod
;
int64_t
commitTime
;
int32_t
walLevel
;
int32_t
compressionLevel
;
...
...
src/query/inc/sql.y
浏览文件 @
a3714edf
...
...
@@ -221,6 +221,7 @@ maxrows(Y) ::= MAXROWS INTEGER(X). { Y = X; }
blocks(Y) ::= BLOCKS INTEGER(X). { Y = X; }
ctime(Y) ::= CTIME INTEGER(X). { Y = X; }
wal(Y) ::= WAL INTEGER(X). { Y = X; }
fsync(Y) ::= FSYNC INTEGER(X). { Y = X; }
comp(Y) ::= COMP INTEGER(X). { Y = X; }
prec(Y) ::= PRECISION STRING(X). { Y = X; }
...
...
@@ -236,6 +237,7 @@ db_optr(Y) ::= db_optr(Z) maxrows(X). { Y = Z; Y.maxRowsPerBlock = strtod
db_optr(Y) ::= db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) ctime(X). { Y = Z; Y.commitTime = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
db_optr(Y) ::= db_optr(Z) prec(X). { Y = Z; Y.precision = X; }
db_optr(Y) ::= db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
...
...
@@ -249,6 +251,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL, 10); }
%type typename {TAOS_FIELD}
typename(A) ::= ids(X). {
...
...
src/query/src/qExecutor.c
浏览文件 @
a3714edf
...
...
@@ -2202,7 +2202,13 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
while
(
true
)
{
if
(
!
tsdbNextDataBlock
(
pQueryHandle
))
{
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
break
;
}
summary
->
totalBlocks
+=
1
;
if
(
IS_QUERY_KILLED
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
...
...
@@ -3188,6 +3194,9 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
// add ref for table
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
);
if
(
pRuntimeEnv
->
pSecQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
switchCtxOrder
(
pRuntimeEnv
);
...
...
@@ -3260,6 +3269,9 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
}
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
);
if
(
pRuntimeEnv
->
pSecQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
pRuntimeEnv
->
windowResInfo
.
curIndex
=
qstatus
.
windowIndex
;
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
...
...
@@ -3916,7 +3928,14 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
TsdbQueryHandleT
pQueryHandle
=
pRuntimeEnv
->
pQueryHandle
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
while
(
true
)
{
if
(
!
tsdbNextDataBlock
(
pQueryHandle
))
{
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
break
;
}
if
(
IS_QUERY_KILLED
(
GET_QINFO_ADDR
(
pRuntimeEnv
)))
{
finalizeQueryResult
(
pRuntimeEnv
);
// clean up allocated resource during query
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
...
...
@@ -3960,7 +3979,14 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
SDataBlockInfo
blockInfo
=
SDATA_BLOCK_INITIALIZER
;
while
(
tsdbNextDataBlock
(
pRuntimeEnv
->
pQueryHandle
))
{
while
(
true
)
{
if
(
!
tsdbNextDataBlock
(
pRuntimeEnv
->
pQueryHandle
))
{
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
break
;
}
tsdbRetrieveDataBlockInfo
(
pRuntimeEnv
->
pQueryHandle
,
&
blockInfo
);
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
...
...
@@ -4059,16 +4085,16 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
return
true
;
}
static
void
setupQueryHandle
(
void
*
tsdb
,
SQInfo
*
pQInfo
,
bool
isSTableQuery
)
{
static
int32_t
setupQueryHandle
(
void
*
tsdb
,
SQInfo
*
pQInfo
,
bool
isSTableQuery
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
if
(
onlyQueryTags
(
pQuery
))
{
return
;
return
TSDB_CODE_SUCCESS
;
}
if
(
isSTableQuery
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
isFixedOutputQuery
(
pRuntimeEnv
)))
{
return
;
return
TSDB_CODE_SUCCESS
;
}
STsdbQueryCond
cond
=
{
...
...
@@ -4090,6 +4116,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
cond
.
twindow
=
pCheckInfo
->
win
;
}
terrno
=
TSDB_CODE_SUCCESS
;
if
(
isFirstLastRowQuery
(
pQuery
))
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryLastRow
(
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
);
}
else
if
(
isPointInterpoQuery
(
pQuery
))
{
...
...
@@ -4097,6 +4124,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
}
else
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
);
}
return
terrno
;
}
static
SFillColInfo
*
taosCreateFillColInfo
(
SQuery
*
pQuery
)
{
...
...
@@ -4133,7 +4161,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
setScanLimitationByResultBuffer
(
pQuery
);
changeExecuteScanOrder
(
pQInfo
,
false
);
setupQueryHandle
(
tsdb
,
pQInfo
,
isSTableQuery
);
code
=
setupQueryHandle
(
tsdb
,
pQInfo
,
isSTableQuery
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
...
...
@@ -4257,7 +4288,14 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
while
(
true
)
{
if
(
!
tsdbNextDataBlock
(
pQueryHandle
))
{
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
break
;
}
summary
->
totalBlocks
+=
1
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
...
...
@@ -4338,6 +4376,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
pQInfo
);
taosArrayDestroy
(
tx
);
taosArrayDestroy
(
g1
);
if
(
pRuntimeEnv
->
pQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
cur
.
vgroupIndex
==
-
1
)
{
...
...
@@ -4405,6 +4446,12 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
else
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryRowsInExternalWindow
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
pQInfo
);
}
taosArrayDestroy
(
tx
);
taosArrayDestroy
(
g1
);
if
(
pRuntimeEnv
->
pQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
initCtxOutputBuf
(
pRuntimeEnv
);
...
...
@@ -4469,6 +4516,9 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
pQInfo
);
taosArrayDestroy
(
g1
);
taosArrayDestroy
(
tx
);
if
(
pRuntimeEnv
->
pQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
SArray
*
s
=
tsdbGetQueriedTableList
(
pRuntimeEnv
->
pQueryHandle
);
assert
(
taosArrayGetSize
(
s
)
>=
1
);
...
...
@@ -4664,6 +4714,9 @@ static void doSaveContext(SQInfo *pQInfo) {
pRuntimeEnv
->
prevGroupId
=
INT32_MIN
;
pRuntimeEnv
->
pSecQueryHandle
=
tsdbQueryTables
(
pQInfo
->
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
);
if
(
pRuntimeEnv
->
pSecQueryHandle
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
terrno
);
}
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
switchCtxOrder
(
pRuntimeEnv
);
...
...
src/query/src/qparserImpl.c
浏览文件 @
a3714edf
...
...
@@ -896,6 +896,7 @@ void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo) {
pDBInfo
->
compressionLevel
=
-
1
;
pDBInfo
->
walLevel
=
-
1
;
pDBInfo
->
fsyncPeriod
=
-
1
;
pDBInfo
->
commitTime
=
-
1
;
pDBInfo
->
maxTablesPerVnode
=
-
1
;
...
...
src/query/src/qtokenizer.c
浏览文件 @
a3714edf
...
...
@@ -124,6 +124,7 @@ static SKeyword keywordTable[] = {
{
"CACHE"
,
TK_CACHE
},
{
"CTIME"
,
TK_CTIME
},
{
"WAL"
,
TK_WAL
},
{
"FSYNC"
,
TK_FSYNC
},
{
"COMP"
,
TK_COMP
},
{
"PRECISION"
,
TK_PRECISION
},
{
"LP"
,
TK_LP
},
...
...
src/query/src/sql.c
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
src/tsdb/inc/tsdbMain.h
浏览文件 @
a3714edf
...
...
@@ -96,6 +96,11 @@ typedef struct {
}
STsdbBufPool
;
// ------------------ tsdbMemTable.c
typedef
struct
{
STable
*
pTable
;
SSkipListIterator
*
pIter
;
}
SCommitIter
;
typedef
struct
{
uint64_t
uid
;
TSKEY
keyFirst
;
...
...
@@ -206,10 +211,10 @@ typedef struct {
int64_t
offset
:
63
;
int32_t
algorithm
:
8
;
int32_t
numOfRows
:
24
;
int32_t
sversion
;
int32_t
len
;
int32_t
keyLen
;
// key column length, keyOffset = offset+sizeof(SCompData)+sizeof(SCompCol)*numOfCols
int16_t
numOfSubBlocks
;
int16_t
numOfCols
;
int16_t
numOfCols
;
// not including timestamp column
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SCompBlock
;
...
...
@@ -377,6 +382,24 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemTable
**
pMem
,
SMemTable
**
pIMem
);
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
);
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TSKEY
*
filterKeys
,
int
nFilterKeys
);
static
FORCE_INLINE
SDataRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
SL_GET_NODE_DATA
(
node
);
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
-
1
;
return
dataRowKey
(
row
);
}
// ------------------ tsdbFile.c
#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
...
...
@@ -421,25 +444,36 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
#define helperType(h) (h)->type
#define helperRepo(h) (h)->pRepo
#define helperState(h) (h)->state
int
tsdbInitReadHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
);
int
tsdbInitWriteHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
);
void
tsdbDestroyHelper
(
SRWHelper
*
pHelper
);
void
tsdbResetHelper
(
SRWHelper
*
pHelper
);
int
tsdbSetAndOpenHelperFile
(
SRWHelper
*
pHelper
,
SFileGroup
*
pGroup
);
int
tsdbCloseHelperFile
(
SRWHelper
*
pHelper
,
bool
hasError
);
void
tsdbSetHelperTable
(
SRWHelper
*
pHelper
,
STable
*
pTable
,
STsdbRepo
*
pRepo
);
int
tsdbWriteDataBlock
(
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
int
tsdbMoveLastBlockIfNeccessary
(
SRWHelper
*
pHelper
);
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
);
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
);
int
tsdbLoadCompIdx
(
SRWHelper
*
pHelper
,
void
*
target
);
int
tsdbLoadCompInfo
(
SRWHelper
*
pHelper
,
void
*
target
);
int
tsdbLoadCompData
(
SRWHelper
*
phelper
,
SCompBlock
*
pcompblock
,
void
*
target
);
void
tsdbGetDataStatis
(
SRWHelper
*
pHelper
,
SDataStatis
*
pStatis
,
int
numOfCols
);
int
tsdbLoadBlockDataCols
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SCompInfo
*
pCompInfo
,
int16_t
*
colIds
,
int
numOfColIds
);
int
tsdbLoadBlockData
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SCompInfo
*
pCompInfo
);
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
int
tsdbInitReadHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
);
int
tsdbInitWriteHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
);
void
tsdbDestroyHelper
(
SRWHelper
*
pHelper
);
void
tsdbResetHelper
(
SRWHelper
*
pHelper
);
int
tsdbSetAndOpenHelperFile
(
SRWHelper
*
pHelper
,
SFileGroup
*
pGroup
);
int
tsdbCloseHelperFile
(
SRWHelper
*
pHelper
,
bool
hasError
);
void
tsdbSetHelperTable
(
SRWHelper
*
pHelper
,
STable
*
pTable
,
STsdbRepo
*
pRepo
);
int
tsdbCommitTableData
(
SRWHelper
*
pHelper
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pDataCols
,
TSKEY
maxKey
);
int
tsdbMoveLastBlockIfNeccessary
(
SRWHelper
*
pHelper
);
int
tsdbWriteCompInfo
(
SRWHelper
*
pHelper
);
int
tsdbWriteCompIdx
(
SRWHelper
*
pHelper
);
int
tsdbLoadCompIdx
(
SRWHelper
*
pHelper
,
void
*
target
);
int
tsdbLoadCompInfo
(
SRWHelper
*
pHelper
,
void
*
target
);
int
tsdbLoadCompData
(
SRWHelper
*
phelper
,
SCompBlock
*
pcompblock
,
void
*
target
);
void
tsdbGetDataStatis
(
SRWHelper
*
pHelper
,
SDataStatis
*
pStatis
,
int
numOfCols
);
int
tsdbLoadBlockDataCols
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SCompInfo
*
pCompInfo
,
int16_t
*
colIds
,
int
numOfColIds
);
int
tsdbLoadBlockData
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SCompInfo
*
pCompInfo
);
static
FORCE_INLINE
int
compTSKEY
(
const
void
*
key1
,
const
void
*
key2
)
{
if
(
*
(
TSKEY
*
)
key1
>
*
(
TSKEY
*
)
key2
)
{
return
1
;
}
else
if
(
*
(
TSKEY
*
)
key1
==
*
(
TSKEY
*
)
key2
)
{
return
0
;
}
else
{
return
-
1
;
}
}
// ------------------ tsdbMain.c
#define REPO_ID(r) (r)->config.tsdbId
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
a3714edf
...
...
@@ -18,11 +18,6 @@
#define TSDB_DATA_SKIPLIST_LEVEL 5
typedef
struct
{
STable
*
pTable
;
SSkipListIterator
*
pIter
;
}
SCommitIter
;
static
FORCE_INLINE
STsdbBufBlock
*
tsdbGetCurrBufBlock
(
STsdbRepo
*
pRepo
);
static
void
tsdbFreeBytes
(
STsdbRepo
*
pRepo
,
void
*
ptr
,
int
bytes
);
...
...
@@ -34,14 +29,11 @@ static char * tsdbGetTsTupleKey(const void *data);
static
void
*
tsdbCommitData
(
void
*
arg
);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
);
static
TSKEY
tsdbNextIterKey
(
SCommitIter
*
pIter
);
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
static
SCommitIter
*
tsdbCreateTableIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyTableIters
(
SCommitIter
*
iters
,
int
maxTables
);
static
int
tsdbReadRowsFromCache
(
STsdbMeta
*
pMeta
,
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
);
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
// ---------------- INTERNAL FUNCTIONS ----------------
int
tsdbInsertRowToMem
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
)
{
...
...
@@ -252,6 +244,66 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
return
0
;
}
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TSKEY
*
filterKeys
,
int
nFilterKeys
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
int
numOfRows
=
0
;
TSKEY
keyNext
=
0
;
int
filterIter
=
0
;
if
(
nFilterKeys
!=
0
)
{
// for filter purpose
ASSERT
(
filterKeys
!=
NULL
);
keyNext
=
tsdbNextIterKey
(
pIter
);
if
(
keyNext
<
0
||
keyNext
>
maxKey
)
return
numOfRows
;
void
*
ptr
=
taosbsearch
((
void
*
)(
&
keyNext
),
(
void
*
)
filterKeys
,
nFilterKeys
,
sizeof
(
TSKEY
),
compTSKEY
,
TD_GE
);
filterIter
=
(
ptr
==
NULL
)
?
nFilterKeys
:
(
POINTER_DISTANCE
(
ptr
,
filterKeys
)
/
sizeof
(
TSKEY
));
}
do
{
if
(
numOfRows
>=
maxRowsToRead
)
break
;
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
break
;
keyNext
=
dataRowKey
(
row
);
if
(
keyNext
<
0
||
keyNext
>
maxKey
)
break
;
bool
keyFiltered
=
false
;
if
(
nFilterKeys
!=
0
)
{
while
(
true
)
{
if
(
filterIter
>=
nFilterKeys
)
break
;
if
(
keyNext
==
filterKeys
[
filterIter
])
{
keyFiltered
=
true
;
filterIter
++
;
break
;
}
else
if
(
keyNext
<
filterKeys
[
filterIter
])
{
break
;
}
else
{
filterIter
++
;
}
}
}
if
(
!
keyFiltered
)
{
if
(
pCols
)
{
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
dataRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
dataRowVersion
(
row
));
if
(
pSchema
==
NULL
)
{
ASSERT
(
0
);
}
}
tdAppendDataRowToDataCol
(
row
,
pSchema
,
pCols
);
}
numOfRows
++
;
}
}
while
(
tSkipListIterNext
(
pIter
));
return
numOfRows
;
}
// ---------------- LOCAL FUNCTIONS ----------------
static
FORCE_INLINE
STsdbBufBlock
*
tsdbGetCurrBufBlock
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
!=
NULL
);
...
...
@@ -378,7 +430,7 @@ static void *tsdbCommitData(void *arg) {
// Create the iterator to read from cache
if
(
pMem
->
numOfRows
>
0
)
{
iters
=
tsdbCreate
Table
Iters
(
pRepo
);
iters
=
tsdbCreate
Commit
Iters
(
pRepo
);
if
(
iters
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create commit iterator since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_exit
;
...
...
@@ -418,7 +470,7 @@ static void *tsdbCommitData(void *arg) {
_exit:
tdFreeDataCols
(
pDataCols
);
tsdbDestroy
Table
Iters
(
iters
,
pCfg
->
maxTables
);
tsdbDestroy
Commit
Iters
(
iters
,
pCfg
->
maxTables
);
tsdbDestroyHelper
(
&
whelper
);
tsdbEndCommit
(
pRepo
);
tsdbInfo
(
"vgId:%d commit over"
,
pRepo
->
config
.
tsdbId
);
...
...
@@ -479,19 +531,9 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_OVER
);
}
static
TSKEY
tsdbNextIterKey
(
SCommitIter
*
pIter
)
{
if
(
pIter
==
NULL
)
return
-
1
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
->
pIter
);
if
(
node
==
NULL
)
return
-
1
;
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
return
dataRowKey
(
row
);
}
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
TSKEY
nextKey
=
tsdbNextIterKey
(
iters
+
i
);
TSKEY
nextKey
=
tsdbNextIterKey
(
(
iters
+
i
)
->
pIter
);
if
(
nextKey
>
0
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
1
;
}
return
0
;
...
...
@@ -504,7 +546,6 @@ static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
)
{
char
*
dataDir
=
NULL
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
NULL
;
...
...
@@ -549,33 +590,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
if
(
pIter
->
pIter
!=
NULL
)
{
tdInitDataCols
(
pDataCols
,
tsdbGetTableSchemaImpl
(
pIter
->
pTable
,
false
,
false
,
-
1
));
int
maxRowsToRead
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
int
nLoop
=
0
;
while
(
true
)
{
int
rowsRead
=
tsdbReadRowsFromCache
(
pMeta
,
pIter
->
pTable
,
pIter
->
pIter
,
maxKey
,
maxRowsToRead
,
pDataCols
);
ASSERT
(
rowsRead
>=
0
);
if
(
pDataCols
->
numOfRows
==
0
)
break
;
nLoop
++
;
ASSERT
(
dataColsKeyFirst
(
pDataCols
)
>=
minKey
&&
dataColsKeyFirst
(
pDataCols
)
<=
maxKey
);
ASSERT
(
dataColsKeyLast
(
pDataCols
)
>=
minKey
&&
dataColsKeyLast
(
pDataCols
)
<=
maxKey
);
int
rowsWritten
=
tsdbWriteDataBlock
(
pHelper
,
pDataCols
);
ASSERT
(
rowsWritten
!=
0
);
if
(
rowsWritten
<
0
)
{
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
tsdbError
(
"vgId:%d failed to write data block to table %s tid %d uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pIter
->
pTable
),
TABLE_TID
(
pIter
->
pTable
),
TABLE_UID
(
pIter
->
pTable
),
tstrerror
(
terrno
));
goto
_err
;
}
ASSERT
(
rowsWritten
<=
pDataCols
->
numOfRows
);
tdPopDataColsPoints
(
pDataCols
,
rowsWritten
);
maxRowsToRead
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
-
pDataCols
->
numOfRows
;
if
(
tsdbCommitTableData
(
pHelper
,
pIter
,
pDataCols
,
maxKey
)
<
0
)
{
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
tsdbError
(
"vgId:%d failed to write data of table %s tid %d uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pIter
->
pTable
),
TABLE_TID
(
pIter
->
pTable
),
TABLE_UID
(
pIter
->
pTable
),
tstrerror
(
terrno
));
goto
_err
;
}
ASSERT
(
pDataCols
->
numOfRows
==
0
);
}
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
...
...
@@ -615,7 +636,7 @@ _err:
return
-
1
;
}
static
SCommitIter
*
tsdbCreate
Table
Iters
(
STsdbRepo
*
pRepo
)
{
static
SCommitIter
*
tsdbCreate
Commit
Iters
(
STsdbRepo
*
pRepo
)
{
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
...
...
@@ -645,21 +666,18 @@ static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) {
goto
_err
;
}
if
(
!
tSkipListIterNext
(
iters
[
i
].
pIter
))
{
terrno
=
TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM
;
goto
_err
;
}
tSkipListIterNext
(
iters
[
i
].
pIter
);
}
}
return
iters
;
_err:
tsdbDestroy
Table
Iters
(
iters
,
pCfg
->
maxTables
);
tsdbDestroy
Commit
Iters
(
iters
,
pCfg
->
maxTables
);
return
NULL
;
}
static
void
tsdbDestroy
Table
Iters
(
SCommitIter
*
iters
,
int
maxTables
)
{
static
void
tsdbDestroy
Commit
Iters
(
SCommitIter
*
iters
,
int
maxTables
)
{
if
(
iters
==
NULL
)
return
;
for
(
int
i
=
1
;
i
<
maxTables
;
i
++
)
{
...
...
@@ -670,35 +688,4 @@ static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) {
}
free
(
iters
);
}
static
int
tsdbReadRowsFromCache
(
STsdbMeta
*
pMeta
,
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
)
{
ASSERT
(
maxRowsToRead
>
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
int
numOfRows
=
0
;
do
{
if
(
numOfRows
>=
maxRowsToRead
)
break
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
break
;
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
if
(
dataRowKey
(
row
)
>
maxKey
)
break
;
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
dataRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
true
,
false
,
dataRowVersion
(
row
));
if
(
pSchema
==
NULL
)
{
// TODO: deal with the error here
ASSERT
(
0
);
}
}
tdAppendDataRowToDataCol
(
row
,
pSchema
,
pCols
);
numOfRows
++
;
}
while
(
tSkipListIterNext
(
pIter
));
return
numOfRows
;
}
\ No newline at end of file
src/tsdb/src/tsdbMeta.c
浏览文件 @
a3714edf
...
...
@@ -727,7 +727,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
T_REF_INC
(
pTable
);
tsdb
Debug
(
"table %s tid %d uid %"
PRIu64
" is created"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
tsdb
Trace
(
"table %s tid %d uid %"
PRIu64
" is created"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
));
return
pTable
;
...
...
@@ -740,7 +740,7 @@ _err:
static
void
tsdbFreeTable
(
STable
*
pTable
)
{
if
(
pTable
)
{
if
(
pTable
->
name
!=
NULL
)
tsdb
Debug
(
"table %s tid %d uid %"
PRIu64
" is destroy
ed"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
tsdb
Trace
(
"table %s tid %d uid %"
PRIu64
" is fre
ed"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
));
tfree
(
TABLE_NAME
(
pTable
));
if
(
TABLE_TYPE
(
pTable
)
!=
TSDB_CHILD_TABLE
)
{
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
src/tsdb/src/tsdbRead.c
浏览文件 @
a3714edf
...
...
@@ -179,7 +179,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle
->
outputCapacity
=
((
STsdbRepo
*
)
tsdb
)
->
config
.
maxRowsPerFileBlock
;
pQueryHandle
->
allocSize
=
0
;
tsdbInitReadHelper
(
&
pQueryHandle
->
rhelper
,
(
STsdbRepo
*
)
tsdb
);
if
(
tsdbInitReadHelper
(
&
pQueryHandle
->
rhelper
,
(
STsdbRepo
*
)
tsdb
)
!=
0
)
{
free
(
pQueryHandle
);
return
NULL
;
}
tsdbTakeMemSnapshot
(
pQueryHandle
->
pTsdb
,
&
pQueryHandle
->
mem
,
&
pQueryHandle
->
imem
);
size_t
sizeOfGroup
=
taosArrayGetSize
(
groupList
->
pGroupList
);
...
...
@@ -238,11 +241,11 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
TsdbQueryHandleT
tsdbQueryLastRow
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
tsdbQueryTables
(
tsdb
,
pCond
,
groupList
,
qinfo
);
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_LAST
;
pQueryHandle
->
order
=
TSDB_ORDER_DESC
;
changeQueryHandleForLastrowQuery
(
pQueryHandle
);
if
(
pQueryHandle
!=
NULL
)
{
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_LAST
;
pQueryHandle
->
order
=
TSDB_ORDER_DESC
;
changeQueryHandleForLastrowQuery
(
pQueryHandle
);
}
return
pQueryHandle
;
}
...
...
@@ -264,9 +267,10 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
TsdbQueryHandleT
tsdbQueryRowsInExternalWindow
(
TSDB_REPO_T
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
void
*
qinfo
)
{
STsdbQueryHandle
*
pQueryHandle
=
(
STsdbQueryHandle
*
)
tsdbQueryTables
(
tsdb
,
pCond
,
groupList
,
qinfo
);
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_EXTERNAL
;
changeQueryHandleForInterpQuery
(
pQueryHandle
);
if
(
pQueryHandle
!=
NULL
)
{
pQueryHandle
->
type
=
TSDB_QUERY_TYPE_EXTERNAL
;
changeQueryHandleForInterpQuery
(
pQueryHandle
);
}
return
pQueryHandle
;
}
...
...
@@ -1522,7 +1526,10 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle
->
activeIndex
=
0
;
pSecQueryHandle
->
outputCapacity
=
((
STsdbRepo
*
)
pSecQueryHandle
->
pTsdb
)
->
config
.
maxRowsPerFileBlock
;
tsdbInitReadHelper
(
&
pSecQueryHandle
->
rhelper
,
(
STsdbRepo
*
)
pSecQueryHandle
->
pTsdb
);
if
(
tsdbInitReadHelper
(
&
pSecQueryHandle
->
rhelper
,
(
STsdbRepo
*
)
pSecQueryHandle
->
pTsdb
)
!=
0
)
{
free
(
pSecQueryHandle
);
return
false
;
}
tsdbTakeMemSnapshot
(
pSecQueryHandle
->
pTsdb
,
&
pSecQueryHandle
->
mem
,
&
pSecQueryHandle
->
imem
);
// allocate buffer in order to load data blocks from file
...
...
@@ -1606,7 +1613,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
}
// TODO: opt by consider the scan order
return
doHasDataInBuffer
(
pQueryHandle
);
bool
ret
=
doHasDataInBuffer
(
pQueryHandle
);
terrno
=
TSDB_CODE_SUCCESS
;
return
ret
;
}
void
changeQueryHandleForLastrowQuery
(
TsdbQueryHandleT
pqHandle
)
{
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
src/wal/src/walMain.c
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/pytest/crash_gen.py
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/pytest/util/dnodes-no-random-fail.py
0 → 100644
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/pytest/util/dnodes-random-fail.py
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/script/sh/deploy.sh
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/script/sh/exec-no-random-fail.sh
0 → 100755
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/script/sh/exec-random-fail.sh
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/script/tmp/mnodes.sim
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
tests/test/c/createTablePerformance.c
浏览文件 @
a3714edf
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录