提交 0f417739 编写于 作者: 1 151250176

add log for partition position and add test code for test partitioned file

上级 190f20f4
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -58,12 +58,12 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
//check if the path is given
options.getOrElse(DefaultSource.path, throw new TSFileDataSourceException(s"${DefaultSource.path} must be specified for org.apache.iotdb.tsfile DataSource"))
if(options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")){
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
val tsfileSchema = NarrowConverter.getUnionSeries(files, conf)
NarrowConverter.toSqlSchema(tsfileSchema)
}
else{
else {
//get union series in TsFile
val tsfileSchema = WideConverter.getUnionSeries(files, conf)
......@@ -92,6 +92,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[DefaultSource])
log.info("This partition starts from " + file.start.asInstanceOf[java.lang.Long] + " and ends at " + (file.start + file.length).asInstanceOf[java.lang.Long])
log.info(file.toString())
val conf = broadcastedConf.value.value
......@@ -118,21 +119,21 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, device_names, measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions, file.start.asInstanceOf[java.lang.Long], (file.start + file.length).asInstanceOf[java.lang.Long])
var queryDataSet : QueryDataSet = null
var device_name:String = null
var queryDataSet: QueryDataSet = null
var device_name: String = null
def queryNext(): Boolean = {
if(queryDataSet != null && queryDataSet.hasNext){
if (queryDataSet != null && queryDataSet.hasNext) {
return true
}
if(queryDataSets.isEmpty){
if (queryDataSets.isEmpty) {
return false
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
while(!queryDataSet.hasNext){
if(queryDataSets.isEmpty){
while (!queryDataSet.hasNext) {
if (queryDataSets.isEmpty) {
return false
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
......@@ -166,7 +167,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
if (field.name == QueryConstant.RESERVED_TIME) {
rowBuffer(index) = curRecord.getTimestamp
}
else if(field.name == NarrowConverter.DEVICE_NAME){
else if (field.name == NarrowConverter.DEVICE_NAME) {
rowBuffer(index) = device_name
}
else {
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
......@@ -145,6 +145,16 @@ class TSFileSuit extends FunSuite with BeforeAndAfterAll {
Assert.assertEquals(TsFileWriteTool.largeNum, count)
}
test("testSelect * from tsfile2 in part") {
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 256)
val df = spark.read.tsfile(tsfile2)
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table")
val count = newDf.count()
Assert.assertEquals(TsFileWriteTool.largeNum, count)
spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 1024 * 128)
}
test("testCount") {
val df = spark.read.tsfile(tsfile1)
df.createOrReplaceTempView("tsfile_table")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册