Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
Iotdb
提交
e7a6433d
I
Iotdb
项目概览
apache
/
Iotdb
11 个月 前同步成功
通知
25
Star
3344
Fork
916
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
I
Iotdb
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
e7a6433d
编写于
6月 26, 2023
作者:
X
Xuan Ronaldo
提交者:
GitHub
6月 26, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
iotdb-connector: fix bugs & smells reported by SonarCloud (#10310)
上级
95689678
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
135 addition
and
146 deletion
+135
-146
iotdb-connector/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
...org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
+2
-0
iotdb-connector/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
...ector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
+2
-2
iotdb-connector/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
.../java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
+4
-3
iotdb-connector/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
...a/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
+1
-1
iotdb-connector/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
...ava/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
+2
-2
iotdb-connector/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
.../org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
+1
-0
iotdb-connector/grafana-connector/src/main/java/org/apache/iotdb/web/grafana/controller/DatabaseConnectController.java
...tdb/web/grafana/controller/DatabaseConnectController.java
+2
-2
iotdb-connector/grafana-connector/src/main/java/org/apache/iotdb/web/grafana/dao/impl/BasicDaoImpl.java
...a/org/apache/iotdb/web/grafana/dao/impl/BasicDaoImpl.java
+3
-2
iotdb-connector/grafana-connector/src/test/java/org/apache/iotdb/web/grafana/dao/impl/BasicDaoImplTest.java
...g/apache/iotdb/web/grafana/dao/impl/BasicDaoImplTest.java
+5
-4
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java
...java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java
+2
-0
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
...ain/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+6
-4
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
...n/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
+5
-4
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFOutputFormat.java
.../java/org/apache/iotdb/hadoop/tsfile/TSFOutputFormat.java
+2
-2
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
.../java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
+2
-2
iotdb-connector/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveRecordWriter.java
.../main/java/org/apache/iotdb/hive/TSFHiveRecordWriter.java
+1
-2
iotdb-connector/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
...ctor/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
+0
-4
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/Executor.java
.../main/java/org/apache/iotdb/spark/tsfile/qp/Executor.java
+1
-0
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/QueryProcessor.java
...java/org/apache/iotdb/spark/tsfile/qp/QueryProcessor.java
+9
-12
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/common/FilterOperator.java
...g/apache/iotdb/spark/tsfile/qp/common/FilterOperator.java
+9
-1
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/common/SQLConstant.java
.../org/apache/iotdb/spark/tsfile/qp/common/SQLConstant.java
+56
-63
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/exception/QueryProcessorException.java
...db/spark/tsfile/qp/exception/QueryProcessorException.java
+1
-1
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/DNFFilterOptimizer.java
...e/iotdb/spark/tsfile/qp/optimizer/DNFFilterOptimizer.java
+7
-8
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/MergeSingleFilterOptimizer.java
...spark/tsfile/qp/optimizer/MergeSingleFilterOptimizer.java
+0
-10
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/PhysicalOptimizer.java
...he/iotdb/spark/tsfile/qp/optimizer/PhysicalOptimizer.java
+10
-10
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/RemoveNotOptimizer.java
...e/iotdb/spark/tsfile/qp/optimizer/RemoveNotOptimizer.java
+0
-5
iotdb-connector/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala
...scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala
+2
-2
未找到文件。
iotdb-connector/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java
浏览文件 @
e7a6433d
...
...
@@ -80,6 +80,8 @@ public class DefaultIoTSerializationSchema implements IoTSerializationSchema<Map
case
BOOLEAN:
values
.
add
(
Boolean
.
parseBoolean
(
valuesStr
[
i
]));
break
;
default
:
continue
;
}
}
...
...
iotdb-connector/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/IoTDBSink.java
浏览文件 @
e7a6433d
...
...
@@ -57,7 +57,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
private
int
batchSize
=
0
;
private
int
flushIntervalMs
=
3000
;
private
List
<
Event
>
batchList
;
private
transient
List
<
Event
>
batchList
;
private
int
sessionPoolSize
=
2
;
public
IoTDBSink
(
IoTDBSinkOptions
options
,
IoTSerializationSchema
<
IN
>
schema
)
{
...
...
@@ -188,7 +188,7 @@ public class IoTDBSink<IN> extends RichSinkFunction<IN> {
private
void
flush
()
throws
Exception
{
if
(
batchSize
>
0
)
{
synchronized
(
batchList
)
{
if
(
batchList
.
size
()
>
0
)
{
if
(
!
batchList
.
isEmpty
()
)
{
List
<
String
>
deviceIds
=
new
ArrayList
<>();
List
<
Long
>
timestamps
=
new
ArrayList
<>();
List
<
List
<
String
>>
measurementsList
=
new
ArrayList
<>();
...
...
iotdb-connector/flink-iotdb-connector/src/test/java/org/apache/iotdb/flink/IoTDBSinkBatchTimerTest.java
浏览文件 @
e7a6433d
...
...
@@ -28,6 +28,8 @@ import org.junit.Test;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Mockito
.
mock
;
...
...
@@ -62,14 +64,13 @@ public class IoTDBSinkBatchTimerTest {
tuple
.
put
(
"types"
,
"DOUBLE"
);
tuple
.
put
(
"values"
,
"36.5"
);
ioTDBSink
.
invoke
(
tuple
,
null
);
Thread
.
sleep
(
2500
);
new
CountDownLatch
(
1
).
await
(
2500
,
TimeUnit
.
MILLISECONDS
);
verify
(
pool
)
.
insertRecords
(
any
(
List
.
class
),
any
(
List
.
class
),
any
(
List
.
class
),
any
(
List
.
class
),
any
(
List
.
class
));
Thread
.
sleep
(
1000
);
new
CountDownLatch
(
1
).
await
(
1000
,
TimeUnit
.
MILLISECONDS
);
verifyZeroInteractions
(
pool
);
}
...
...
iotdb-connector/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java
浏览文件 @
e7a6433d
...
...
@@ -150,7 +150,7 @@ public class RowTSRecordConverter implements TSRecordConverter<Row> {
}
}
for
(
TSRecord
tsRecord
:
reuse
)
{
if
(
tsRecord
.
dataPointList
.
size
()
>
0
)
{
if
(
!
tsRecord
.
dataPointList
.
isEmpty
()
)
{
tsRecord
.
setTime
(
timestamp
);
collector
.
collect
(
tsRecord
);
}
...
...
iotdb-connector/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/TsFileOutputFormat.java
浏览文件 @
e7a6433d
...
...
@@ -53,7 +53,7 @@ public abstract class TsFileOutputFormat<T> extends FileOutputFormat<T> {
@Nullable
protected
TSFileConfig
config
;
protected
transient
Configuration
hadoopConf
=
null
;
private
FileOutputStream
fos
=
null
;
private
transient
FileOutputStream
fos
=
null
;
protected
transient
TsFileWriter
writer
=
null
;
protected
TsFileOutputFormat
(
String
path
,
Schema
schema
,
TSFileConfig
config
)
{
...
...
@@ -77,7 +77,7 @@ public abstract class TsFileOutputFormat<T> extends FileOutputFormat<T> {
// Use TsFile API to write instead of FSDataOutputStream.
this
.
stream
.
close
();
Path
actualFilePath
=
getAcutalFilePath
();
TsFileOutput
out
;
TsFileOutput
out
=
null
;
try
{
if
(
actualFilePath
.
getFileSystem
().
isDistributedFS
())
{
// HDFS
...
...
iotdb-connector/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/util/TSFileConfigUtil.java
浏览文件 @
e7a6433d
...
...
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
/** Utils of setting global TSFileConfig. */
public
class
TSFileConfigUtil
{
private
TSFileConfigUtil
()
{}
public
static
void
setGlobalTSFileConfig
(
TSFileConfig
config
)
{
TSFileConfig
globalConfig
=
TSFileDescriptor
.
getInstance
().
getConfig
();
...
...
iotdb-connector/grafana-connector/src/main/java/org/apache/iotdb/web/grafana/controller/DatabaseConnectController.java
浏览文件 @
e7a6433d
...
...
@@ -33,9 +33,9 @@ import org.springframework.http.HttpStatus;
import
org.springframework.http.MediaType
;
import
org.springframework.stereotype.Controller
;
import
org.springframework.web.bind.annotation.CrossOrigin
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.RequestBody
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RequestMethod
;
import
org.springframework.web.bind.annotation.ResponseBody
;
import
org.springframework.web.bind.annotation.ResponseStatus
;
...
...
@@ -59,7 +59,7 @@ public class DatabaseConnectController {
@Autowired
private
DatabaseConnectService
databaseConnectService
;
@
RequestMapping
(
value
=
"/"
,
method
=
RequestMethod
.
GET
)
@
GetMapping
(
value
=
"/"
)
@ResponseStatus
(
value
=
HttpStatus
.
OK
)
public
void
testDataConnection
(
HttpServletResponse
response
)
throws
IOException
{
logger
.
info
(
"Connection is ok now!"
);
...
...
iotdb-connector/grafana-connector/src/main/java/org/apache/iotdb/web/grafana/dao/impl/BasicDaoImpl.java
浏览文件 @
e7a6433d
...
...
@@ -101,7 +101,8 @@ public class BasicDaoImpl implements BasicDao {
default
:
timestampRadioX
=
1L
;
}
logger
.
info
(
"Use timestamp precision {}"
,
timestampPrecision
.
replaceAll
(
"[\n\r\t]"
,
"_"
));
timestampPrecision
=
timestampPrecision
.
replaceAll
(
"[\n\r\t]"
,
"_"
);
logger
.
info
(
"Use timestamp precision {}"
,
timestampPrecision
);
}
/**
...
...
@@ -164,7 +165,7 @@ public class BasicDaoImpl implements BasicDao {
}
public
String
getInterval
(
final
long
hours
)
{
if
(!
isDownSampling
||
!(
hours
>
1
)
)
{
if
(!
isDownSampling
||
hours
<=
1
)
{
return
""
;
}
...
...
iotdb-connector/grafana-connector/src/test/java/org/apache/iotdb/web/grafana/dao/impl/BasicDaoImplTest.java
浏览文件 @
e7a6433d
...
...
@@ -19,6 +19,7 @@
package
org.apache.iotdb.web.grafana.dao.impl
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.springframework.test.util.ReflectionTestUtils
;
...
...
@@ -38,15 +39,15 @@ public class BasicDaoImplTest {
ReflectionTestUtils
.
setField
(
impl
,
"interval"
,
"1m"
);
String
interval1
=
impl
.
getInterval
(
0
);
assert
interval1
.
equals
(
""
);
Assert
.
assertEquals
(
""
,
interval1
);
String
interval2
=
impl
.
getInterval
(
3
);
assert
interval2
.
equals
(
"1m"
);
Assert
.
assertEquals
(
"1m"
,
interval2
);
String
interval3
=
impl
.
getInterval
(
25
);
assert
interval3
.
equals
(
"1h"
);
Assert
.
assertEquals
(
"1h"
,
interval3
);
String
interval4
=
impl
.
getInterval
(
24
*
30
+
1
);
assert
interval4
.
equals
(
"1d"
);
Assert
.
assertEquals
(
"1d"
,
interval4
);
}
}
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java
浏览文件 @
e7a6433d
...
...
@@ -35,6 +35,8 @@ import java.net.MalformedURLException;
class
HDFSConfUtil
{
private
HDFSConfUtil
()
{}
private
static
TSFileConfig
tsFileConfig
=
TSFileDescriptor
.
getInstance
().
getConfig
();
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
HDFSConfUtil
.
class
);
...
...
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
浏览文件 @
e7a6433d
...
...
@@ -28,6 +28,8 @@ import org.apache.hadoop.io.IOUtils;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.annotation.Nullable
;
import
java.io.BufferedInputStream
;
import
java.io.BufferedOutputStream
;
import
java.io.BufferedReader
;
...
...
@@ -48,8 +50,8 @@ import java.util.List;
public
class
HDFSFile
extends
File
{
private
static
final
long
serialVersionUID
=
-
8419827359081949547L
;
private
Path
hdfsPath
;
private
FileSystem
fs
;
private
transient
Path
hdfsPath
;
private
transient
FileSystem
fs
;
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
HDFSFile
.
class
);
private
static
final
String
UNSUPPORT_OPERATION
=
"Unsupported operation."
;
...
...
@@ -117,7 +119,7 @@ public class HDFSFile extends File {
}
@Override
public
File
[]
listFiles
()
{
public
@Nullable
File
[]
listFiles
()
{
List
<
HDFSFile
>
files
=
new
ArrayList
<>();
try
{
for
(
FileStatus
fileStatus
:
fs
.
listStatus
(
hdfsPath
))
{
...
...
@@ -230,7 +232,7 @@ public class HDFSFile extends File {
}
}
public
BufferedWriter
getBufferedWriter
(
String
filePath
,
boolean
append
)
{
public
BufferedWriter
getBufferedWriter
(
String
filePath
)
{
try
{
return
new
BufferedWriter
(
new
OutputStreamWriter
(
fs
.
create
(
new
Path
(
filePath
))));
}
catch
(
IOException
e
)
{
...
...
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFInputFormat.java
浏览文件 @
e7a6433d
...
...
@@ -260,17 +260,18 @@ public class TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
if
(
length
>
0
)
{
FileSystem
fileSystem
=
path
.
getFileSystem
(
configuration
);
logger
.
info
(
"The file status is {}"
,
fileStatus
.
getClass
().
getName
());
logger
.
info
(
"The file system is
"
+
fileSystem
.
getClass
());
logger
.
info
(
"The file system is
{}"
,
fileSystem
.
getClass
());
blockLocations
=
fileSystem
.
getFileBlockLocations
(
fileStatus
,
0
,
length
);
logger
.
info
(
"The block location information is {}"
,
Arrays
.
toString
(
blockLocations
));
String
locationInfo
=
Arrays
.
toString
(
blockLocations
);
logger
.
info
(
"The block location information is {}"
,
locationInfo
);
splits
.
addAll
(
generateSplits
(
path
,
blockLocations
));
}
else
{
logger
.
warn
(
"The file length is
"
+
length
);
logger
.
warn
(
"The file length is
{}"
,
length
);
}
}
configuration
.
setLong
(
NUM_INPUT_FILES
,
listFileStatus
.
size
());
logger
.
info
(
"The number of splits is
"
+
splits
.
size
());
logger
.
info
(
"The number of splits is
{}"
,
splits
.
size
());
return
splits
;
}
...
...
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFOutputFormat.java
浏览文件 @
e7a6433d
...
...
@@ -35,7 +35,7 @@ import java.util.Objects;
public
class
TSFOutputFormat
extends
FileOutputFormat
<
NullWritable
,
HDFSTSRecord
>
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TSFOutputFormat
.
class
);
private
static
final
String
extension
=
""
;
private
static
final
String
EXTENSION
=
""
;
private
static
Schema
schema
;
public
static
Schema
getSchema
()
{
...
...
@@ -50,7 +50,7 @@ public class TSFOutputFormat extends FileOutputFormat<NullWritable, HDFSTSRecord
public
RecordWriter
<
NullWritable
,
HDFSTSRecord
>
getRecordWriter
(
TaskAttemptContext
job
)
throws
IOException
{
Path
outputPath
=
getDefaultWorkFile
(
job
,
extension
);
Path
outputPath
=
getDefaultWorkFile
(
job
,
EXTENSION
);
logger
.
info
(
"The task attempt id is {}, the output path is {}"
,
job
.
getTaskAttemptID
(),
outputPath
);
return
new
TSFRecordWriter
(
job
,
outputPath
,
Objects
.
requireNonNull
(
schema
));
...
...
iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
浏览文件 @
e7a6433d
...
...
@@ -102,8 +102,8 @@ public class TSFRecordReader extends RecordReader<NullWritable, MapWritable> imp
List
<
String
>
deviceIds
=
TSFInputFormat
.
getReadDeviceIds
(
configuration
);
List
<
String
>
measurementIds
=
TSFInputFormat
.
getReadMeasurementIds
(
configuration
);
readerSet
.
setMeasurementIds
(
measurementIds
);
logger
.
info
(
"deviceIds:
"
+
deviceIds
);
logger
.
info
(
"Sensors:
"
+
measurementIds
);
logger
.
info
(
"deviceIds:
{}"
,
deviceIds
);
logger
.
info
(
"Sensors:
{}"
,
measurementIds
);
readerSet
.
setReadDeviceId
(
TSFInputFormat
.
getReadDeviceId
(
configuration
));
readerSet
.
setReadTime
(
TSFInputFormat
.
getReadTime
(
configuration
));
...
...
iotdb-connector/hive-connector/src/main/java/org/apache/iotdb/hive/TSFHiveRecordWriter.java
浏览文件 @
e7a6433d
...
...
@@ -19,7 +19,6 @@
package
org.apache.iotdb.hive
;
import
org.apache.iotdb.hadoop.fileSystem.HDFSOutput
;
import
org.apache.iotdb.hadoop.tsfile.TSFRecordWriter
;
import
org.apache.iotdb.hadoop.tsfile.record.HDFSTSRecord
;
import
org.apache.iotdb.tsfile.exception.write.WriteProcessException
;
import
org.apache.iotdb.tsfile.write.TsFileWriter
;
...
...
@@ -40,7 +39,7 @@ import java.io.IOException;
*/
public
class
TSFHiveRecordWriter
implements
FileSinkOperator
.
RecordWriter
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TSFRecordWriter
.
class
);
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TSF
Hive
RecordWriter
.
class
);
private
TsFileWriter
writer
;
...
...
iotdb-connector/hive-connector/src/main/java/org/apache/iotdb/hive/TsFileSerDe.java
浏览文件 @
e7a6433d
...
...
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import
org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
;
import
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
;
import
org.apache.hadoop.io.Writable
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
javax.annotation.Nullable
;
...
...
@@ -50,8 +48,6 @@ import static org.apache.iotdb.hadoop.tsfile.TSFInputFormat.READ_MEASUREMENTID;
public
class
TsFileSerDe
extends
AbstractSerDe
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TsFileSerDe
.
class
);
public
static
final
String
DEVICE_ID
=
"device_id"
;
private
List
<
String
>
columnNames
;
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/Executor.java
浏览文件 @
e7a6433d
...
...
@@ -28,6 +28,7 @@ import java.util.List;
/** This class used to execute Queries on TSFile */
public
class
Executor
{
private
Executor
()
{}
public
static
List
<
QueryDataSet
>
query
(
TsFileReader
reader
,
List
<
QueryExpression
>
queryExpressions
,
long
start
,
long
end
)
{
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/QueryProcessor.java
浏览文件 @
e7a6433d
...
...
@@ -129,7 +129,7 @@ public class QueryProcessor {
return
ret
;
}
// a list of conjunctions linked by or
return
filterOperator
.
childOperators
;
return
filterOperator
.
getChildOperators
()
;
}
@SuppressWarnings
(
"squid:S3776"
)
// Suppress high Cognitive Complexity warning
...
...
@@ -169,16 +169,13 @@ public class QueryProcessor {
"The same key filter has been specified more than once: "
+
singlePath
);
}
}
else
{
switch
(
child
.
getSinglePath
())
{
case
SQLConstant
.
RESERVED_TIME
:
if
(
timeFilter
!=
null
)
{
throw
new
QueryOperatorException
(
"time filter has been specified more than once"
);
}
timeFilter
=
child
;
break
;
default
:
valueList
.
add
(
child
);
break
;
if
(
SQLConstant
.
RESERVED_TIME
.
equals
(
child
.
getSinglePath
()))
{
if
(
timeFilter
!=
null
)
{
throw
new
QueryOperatorException
(
"time filter has been specified more than once"
);
}
timeFilter
=
child
;
}
else
{
valueList
.
add
(
child
);
}
}
}
...
...
@@ -189,7 +186,7 @@ public class QueryProcessor {
}
else
if
(
valueList
.
size
()
>
1
)
{
valueFilter
=
new
FilterOperator
(
SQLConstant
.
KW_AND
,
false
);
valueFilter
.
childOperators
=
valueList
;
valueFilter
.
setChildOperators
(
valueList
)
;
}
return
new
SingleQuery
(
columnFilterOperators
,
timeFilter
,
valueFilter
);
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/common/FilterOperator.java
浏览文件 @
e7a6433d
...
...
@@ -30,7 +30,7 @@ import java.util.List;
*/
public
class
FilterOperator
extends
Operator
implements
Comparable
<
FilterOperator
>
{
p
ublic
List
<
FilterOperator
>
childOperators
;
p
rotected
List
<
FilterOperator
>
childOperators
;
// leaf filter operator means it doesn't have left and right child filterOperator. Leaf filter
// should be BasicOperator.
protected
boolean
isLeaf
=
false
;
...
...
@@ -95,6 +95,14 @@ public class FilterOperator extends Operator implements Comparable<FilterOperato
return
singlePath
;
}
public
List
<
FilterOperator
>
getChildOperators
()
{
return
childOperators
;
}
public
void
setChildOperators
(
List
<
FilterOperator
>
childOperators
)
{
this
.
childOperators
=
childOperators
;
}
public
void
addChildOPerator
(
FilterOperator
op
)
{
childOperators
.
add
(
op
);
}
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/common/SQLConstant.java
浏览文件 @
e7a6433d
...
...
@@ -23,6 +23,7 @@ import java.util.Map;
/** this class contains several constants used in SQL. */
public
class
SQLConstant
{
private
SQLConstant
()
{}
public
static
final
String
DELTA_OBJECT_NAME
=
"delta_object_name"
;
public
static
final
String
REGEX_PATH_SEPARATOR
=
"\\."
;
...
...
@@ -72,70 +73,62 @@ public class SQLConstant {
public
static
final
int
TOK_PROPERTY_LINK
=
57
;
public
static
final
int
TOK_PROPERTY_UNLINK
=
58
;
public
static
final
Map
<
Integer
,
String
>
tokenSymbol
=
new
HashMap
<
Integer
,
String
>()
{
{
put
(
KW_AND
,
"&"
);
put
(
KW_OR
,
"|"
);
put
(
KW_NOT
,
"!"
);
put
(
EQUAL
,
"="
);
put
(
NOTEQUAL
,
"<>"
);
put
(
EQUAL_NS
,
"<=>"
);
put
(
LESSTHANOREQUALTO
,
"<="
);
put
(
LESSTHAN
,
"<"
);
put
(
GREATERTHANOREQUALTO
,
">="
);
put
(
GREATERTHAN
,
">"
);
}
};
protected
static
final
Map
<
Integer
,
String
>
tokenSymbol
=
new
HashMap
<
Integer
,
String
>()
{};
public
static
final
Map
<
Integer
,
String
>
tokenNames
=
new
HashMap
<
Integer
,
String
>()
{};
public
static
final
Map
<
Integer
,
Integer
>
reverseWords
=
new
HashMap
<
Integer
,
Integer
>()
{};
public
static
final
Map
<
Integer
,
String
>
tokenNames
=
new
HashMap
<
Integer
,
String
>()
{
{
put
(
KW_AND
,
"and"
);
put
(
KW_OR
,
"or"
);
put
(
KW_NOT
,
"not"
);
put
(
EQUAL
,
"equal"
);
put
(
NOTEQUAL
,
"not_equal"
);
put
(
EQUAL_NS
,
"equal_ns"
);
put
(
LESSTHANOREQUALTO
,
"lessthan_or_equalto"
);
put
(
LESSTHAN
,
"lessthan"
);
put
(
GREATERTHANOREQUALTO
,
"greaterthan_or_equalto"
);
put
(
GREATERTHAN
,
"greaterthan"
);
put
(
TOK_SELECT
,
"TOK_SELECT"
);
put
(
TOK_FROM
,
"TOK_FROM"
);
put
(
TOK_WHERE
,
"TOK_WHERE"
);
put
(
TOK_INSERT
,
"TOK_INSERT"
);
put
(
TOK_DELETE
,
"TOK_DELETE"
);
put
(
TOK_UPDATE
,
"TOK_UPDATE"
);
put
(
TOK_QUERY
,
"TOK_QUERY"
);
put
(
TOK_AUTHOR_CREATE
,
"TOK_AUTHOR_CREATE"
);
put
(
TOK_AUTHOR_DROP
,
"TOK_AUTHOR_DROP"
);
put
(
TOK_AUTHOR_GRANT
,
"TOK_AUTHOR_GRANT"
);
put
(
TOK_AUTHOR_REVOKE
,
"TOK_AUTHOR_REVOKE"
);
put
(
TOK_DATALOAD
,
"TOK_DATALOAD"
);
put
(
TOK_METADATA_CREATE
,
"TOK_METADATA_CREATE"
);
put
(
TOK_METADATA_DELETE
,
"TOK_METADATA_DELETE"
);
put
(
TOK_METADATA_SET_FILE_LEVEL
,
"TOK_METADATA_SET_FILE_LEVEL"
);
put
(
TOK_PROPERTY_CREATE
,
"TOK_PROPERTY_CREATE"
);
put
(
TOK_PROPERTY_ADD_LABEL
,
"TOK_PROPERTY_ADD_LABEL"
);
put
(
TOK_PROPERTY_DELETE_LABEL
,
"TOK_PROPERTY_DELETE_LABEL"
);
put
(
TOK_PROPERTY_LINK
,
"TOK_PROPERTY_LINK"
);
put
(
TOK_PROPERTY_UNLINK
,
"TOK_PROPERTY_UNLINK"
);
}
};
public
static
final
Map
<
Integer
,
Integer
>
reverseWords
=
new
HashMap
<
Integer
,
Integer
>()
{
{
put
(
KW_AND
,
KW_OR
);
put
(
KW_OR
,
KW_AND
);
put
(
EQUAL
,
NOTEQUAL
);
put
(
NOTEQUAL
,
EQUAL
);
put
(
LESSTHAN
,
GREATERTHANOREQUALTO
);
put
(
GREATERTHANOREQUALTO
,
LESSTHAN
);
put
(
LESSTHANOREQUALTO
,
GREATERTHAN
);
put
(
GREATERTHAN
,
LESSTHANOREQUALTO
);
}
};
static
{
tokenSymbol
.
put
(
KW_AND
,
"&"
);
tokenSymbol
.
put
(
KW_OR
,
"|"
);
tokenSymbol
.
put
(
KW_NOT
,
"!"
);
tokenSymbol
.
put
(
EQUAL
,
"="
);
tokenSymbol
.
put
(
NOTEQUAL
,
"<>"
);
tokenSymbol
.
put
(
EQUAL_NS
,
"<=>"
);
tokenSymbol
.
put
(
LESSTHANOREQUALTO
,
"<="
);
tokenSymbol
.
put
(
LESSTHAN
,
"<"
);
tokenSymbol
.
put
(
GREATERTHANOREQUALTO
,
">="
);
tokenSymbol
.
put
(
GREATERTHAN
,
">"
);
tokenNames
.
put
(
KW_AND
,
"and"
);
tokenNames
.
put
(
KW_OR
,
"or"
);
tokenNames
.
put
(
KW_NOT
,
"not"
);
tokenNames
.
put
(
EQUAL
,
"equal"
);
tokenNames
.
put
(
NOTEQUAL
,
"not_equal"
);
tokenNames
.
put
(
EQUAL_NS
,
"equal_ns"
);
tokenNames
.
put
(
LESSTHANOREQUALTO
,
"lessthan_or_equalto"
);
tokenNames
.
put
(
LESSTHAN
,
"lessthan"
);
tokenNames
.
put
(
GREATERTHANOREQUALTO
,
"greaterthan_or_equalto"
);
tokenNames
.
put
(
GREATERTHAN
,
"greaterthan"
);
tokenNames
.
put
(
TOK_SELECT
,
"TOK_SELECT"
);
tokenNames
.
put
(
TOK_FROM
,
"TOK_FROM"
);
tokenNames
.
put
(
TOK_WHERE
,
"TOK_WHERE"
);
tokenNames
.
put
(
TOK_INSERT
,
"TOK_INSERT"
);
tokenNames
.
put
(
TOK_DELETE
,
"TOK_DELETE"
);
tokenNames
.
put
(
TOK_UPDATE
,
"TOK_UPDATE"
);
tokenNames
.
put
(
TOK_QUERY
,
"TOK_QUERY"
);
tokenNames
.
put
(
TOK_AUTHOR_CREATE
,
"TOK_AUTHOR_CREATE"
);
tokenNames
.
put
(
TOK_AUTHOR_DROP
,
"TOK_AUTHOR_DROP"
);
tokenNames
.
put
(
TOK_AUTHOR_GRANT
,
"TOK_AUTHOR_GRANT"
);
tokenNames
.
put
(
TOK_AUTHOR_REVOKE
,
"TOK_AUTHOR_REVOKE"
);
tokenNames
.
put
(
TOK_DATALOAD
,
"TOK_DATALOAD"
);
tokenNames
.
put
(
TOK_METADATA_CREATE
,
"TOK_METADATA_CREATE"
);
tokenNames
.
put
(
TOK_METADATA_DELETE
,
"TOK_METADATA_DELETE"
);
tokenNames
.
put
(
TOK_METADATA_SET_FILE_LEVEL
,
"TOK_METADATA_SET_FILE_LEVEL"
);
tokenNames
.
put
(
TOK_PROPERTY_CREATE
,
"TOK_PROPERTY_CREATE"
);
tokenNames
.
put
(
TOK_PROPERTY_ADD_LABEL
,
"TOK_PROPERTY_ADD_LABEL"
);
tokenNames
.
put
(
TOK_PROPERTY_DELETE_LABEL
,
"TOK_PROPERTY_DELETE_LABEL"
);
tokenNames
.
put
(
TOK_PROPERTY_LINK
,
"TOK_PROPERTY_LINK"
);
tokenNames
.
put
(
TOK_PROPERTY_UNLINK
,
"TOK_PROPERTY_UNLINK"
);
reverseWords
.
put
(
KW_AND
,
KW_OR
);
reverseWords
.
put
(
KW_OR
,
KW_AND
);
reverseWords
.
put
(
EQUAL
,
NOTEQUAL
);
reverseWords
.
put
(
NOTEQUAL
,
EQUAL
);
reverseWords
.
put
(
LESSTHAN
,
GREATERTHANOREQUALTO
);
reverseWords
.
put
(
GREATERTHANOREQUALTO
,
LESSTHAN
);
reverseWords
.
put
(
LESSTHANOREQUALTO
,
GREATERTHAN
);
reverseWords
.
put
(
GREATERTHAN
,
LESSTHANOREQUALTO
);
}
public
static
boolean
isReservedPath
(
String
pathStr
)
{
return
pathStr
.
equals
(
SQLConstant
.
RESERVED_TIME
)
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/exception/QueryProcessorException.java
浏览文件 @
e7a6433d
...
...
@@ -23,7 +23,7 @@ public class QueryProcessorException extends Exception {
private
static
final
long
serialVersionUID
=
-
8987915921329335088L
;
private
String
errMsg
;
private
final
String
errMsg
;
QueryProcessorException
(
String
msg
)
{
super
(
msg
);
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/DNFFilterOptimizer.java
浏览文件 @
e7a6433d
...
...
@@ -113,14 +113,13 @@ public class DNFFilterOptimizer implements IFilterOptimizer {
* @return List<FilterOperator>
*/
private
List
<
FilterOperator
>
getAndChild
(
FilterOperator
child
)
{
switch
(
child
.
getTokenIntType
())
{
case
KW_OR:
return
child
.
getChildren
();
default
:
// other token type means leaf node or "and" operator
List
<
FilterOperator
>
ret
=
new
ArrayList
<>();
ret
.
add
(
child
);
return
ret
;
if
(
KW_OR
==
child
.
getTokenIntType
())
{
return
child
.
getChildren
();
}
else
{
// other token type means leaf node or "and" operator
List
<
FilterOperator
>
ret
=
new
ArrayList
<>();
ret
.
add
(
child
);
return
ret
;
}
}
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/MergeSingleFilterOptimizer.java
浏览文件 @
e7a6433d
...
...
@@ -18,7 +18,6 @@
*/
package
org.apache.iotdb.spark.tsfile.qp.optimizer
;
import
org.apache.iotdb.spark.tsfile.qp.common.BasicOperator
;
import
org.apache.iotdb.spark.tsfile.qp.common.FilterOperator
;
import
org.apache.iotdb.spark.tsfile.qp.exception.MergeFilterException
;
...
...
@@ -131,13 +130,4 @@ public class MergeSingleFilterOptimizer implements IFilterOptimizer {
return
null
;
}
}
private
boolean
allIsBasic
(
List
<
FilterOperator
>
children
)
{
for
(
FilterOperator
child
:
children
)
{
if
(!(
child
instanceof
BasicOperator
))
{
return
false
;
}
}
return
true
;
}
}
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/PhysicalOptimizer.java
浏览文件 @
e7a6433d
...
...
@@ -99,7 +99,7 @@ public class PhysicalOptimizer {
}
// query all measurements from TSFile
if
(
selectedSeries
.
size
()
==
0
)
{
if
(
selectedSeries
.
isEmpty
()
)
{
selectedSeries
.
addAll
(
allMeasurementsInFile
.
keySet
());
}
else
{
// remove paths that doesn't exist in file
...
...
@@ -143,10 +143,10 @@ public class PhysicalOptimizer {
// which should in column names -> now just device_name
// use delta_object column
if
(
columnValues
.
containsKey
(
SQLConstant
.
RESERVED_DELTA_OBJECT
))
{
Set
<
String
>
delta
_o
bjects
=
columnValues
.
get
(
SQLConstant
.
RESERVED_DELTA_OBJECT
);
for
(
String
delta
_object
:
delta_o
bjects
)
{
if
(
actualDeltaObjects
.
contains
(
delta
_o
bject
))
{
validDeltaObjects
.
add
(
delta
_o
bject
);
Set
<
String
>
delta
O
bjects
=
columnValues
.
get
(
SQLConstant
.
RESERVED_DELTA_OBJECT
);
for
(
String
delta
Object
:
deltaO
bjects
)
{
if
(
actualDeltaObjects
.
contains
(
delta
O
bject
))
{
validDeltaObjects
.
add
(
delta
O
bject
);
}
}
return
;
...
...
@@ -178,14 +178,14 @@ public class PhysicalOptimizer {
}
private
Map
<
String
,
Set
<
String
>>
mergeColumns
(
List
<
FilterOperator
>
columnFilterOperators
)
{
Map
<
String
,
Set
<
String
>>
column
_values_m
ap
=
new
HashMap
<>();
Map
<
String
,
Set
<
String
>>
column
ValuesM
ap
=
new
HashMap
<>();
for
(
FilterOperator
filterOperator
:
columnFilterOperators
)
{
Pair
<
String
,
Set
<
String
>>
column
_v
alues
=
mergeColumn
(
filterOperator
);
if
(
column
_values
!=
null
&&
!
column_v
alues
.
right
.
isEmpty
())
{
column
_values_map
.
put
(
column_values
.
left
,
column_v
alues
.
right
);
Pair
<
String
,
Set
<
String
>>
column
V
alues
=
mergeColumn
(
filterOperator
);
if
(
column
Values
!=
null
&&
!
columnV
alues
.
right
.
isEmpty
())
{
column
ValuesMap
.
put
(
columnValues
.
left
,
columnV
alues
.
right
);
}
}
return
column
_values_m
ap
;
return
column
ValuesM
ap
;
}
/**
...
...
iotdb-connector/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/RemoveNotOptimizer.java
浏览文件 @
e7a6433d
...
...
@@ -23,9 +23,6 @@ import org.apache.iotdb.spark.tsfile.qp.common.FilterOperator;
import
org.apache.iotdb.spark.tsfile.qp.common.SQLConstant
;
import
org.apache.iotdb.spark.tsfile.qp.exception.RemoveNotException
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
java.util.List
;
import
static
org
.
apache
.
iotdb
.
spark
.
tsfile
.
qp
.
common
.
SQLConstant
.
KW_AND
;
...
...
@@ -34,8 +31,6 @@ import static org.apache.iotdb.spark.tsfile.qp.common.SQLConstant.KW_OR;
public
class
RemoveNotOptimizer
implements
IFilterOptimizer
{
private
static
final
Logger
LOG
=
LoggerFactory
.
getLogger
(
RemoveNotOptimizer
.
class
);
/**
* get DNF(disjunctive normal form) for this filter operator tree. Before getDNF, this op tree
* must be binary, in another word, each non-leaf node has exactly two children.
...
...
iotdb-connector/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/NarrowConverter.scala
浏览文件 @
e7a6433d
...
...
@@ -310,7 +310,7 @@ object NarrowConverter extends Converter {
throw
new
Exception
(
"NOT filter is not supported now"
)
case
SQLConstant
.
KW_AND
=>
node
.
c
hildOperators
.
foreach
((
child
:
FilterOperator
)
=>
{
node
.
getC
hildOperators
.
foreach
((
child
:
FilterOperator
)
=>
{
if
(
filter
==
null
)
{
filter
=
transformFilterToExpression
(
schema
,
child
,
device_name
)
}
...
...
@@ -322,7 +322,7 @@ object NarrowConverter extends Converter {
filter
case
SQLConstant
.
KW_OR
=>
node
.
c
hildOperators
.
foreach
((
child
:
FilterOperator
)
=>
{
node
.
getC
hildOperators
.
foreach
((
child
:
FilterOperator
)
=>
{
if
(
filter
==
null
)
{
filter
=
transformFilterToExpression
(
schema
,
child
,
device_name
)
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录