提交 983f7613 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

1. Close the output stream when switch to write next file. (#1958)

2. Close the input stream when switch to read next file.

#1665
上级 7a9721a6
......@@ -21,6 +21,9 @@ package org.apache.skywalking.oap.server.library.buffer;
import java.util.Arrays;
/**
* This class is a util for sort or build file name for the gRPC streaming data.
* Sort the files by the created time in order to read the data file sequential.
*
* @author peng-yongsheng
*/
class BufferFileUtils {
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.library.buffer;
import com.google.protobuf.*;
import java.io.*;
import java.util.Objects;
import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
......@@ -78,8 +79,12 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private void openInputStream(File readingFile) {
try {
this.readingFile = readingFile;
if (Objects.nonNull(inputStream)) {
inputStream.close();
}
inputStream = new FileInputStream(readingFile);
} catch (FileNotFoundException e) {
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
......
......@@ -88,6 +88,7 @@ class DataStreamWriter<MESSAGE_TYPE extends GeneratedMessageV3> {
writeOffset.setOffset(position);
if (position >= (FileUtils.ONE_MB * dataFileMaxSize)) {
File writingFile = createNewFile();
outputStream.close();
outputStream = FileUtils.openOutputStream(writingFile, true);
}
} catch (IOException e) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.buffer;
import java.util.*;
import org.junit.*;
/**
* @author peng-yongsheng
*/
public class BufferFileUtilsTestCase {
@Test
public void testSort() {
List<String> fileNames = new ArrayList<>();
fileNames.add("data-1.sw");
fileNames.add("data-3.sw");
fileNames.add("data-2.sw");
fileNames.add("data-8.sw");
fileNames.add("data-5.sw");
String[] files = fileNames.toArray(new String[0]);
BufferFileUtils.sort(files);
Assert.assertEquals("data-1.sw", files[0]);
Assert.assertEquals("data-2.sw", files[1]);
Assert.assertEquals("data-3.sw", files[2]);
Assert.assertEquals("data-5.sw", files[3]);
Assert.assertEquals("data-8.sw", files[4]);
}
}
......@@ -20,7 +20,7 @@
<Configuration status="DEBUG">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
<PatternLayout charset="UTF-8" pattern="%d - %c - %L [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
......
......@@ -63,13 +63,15 @@ public class AgentDataMock {
TimeUnit.SECONDS.sleep(10);
globalTraceId = UniqueIdBuilder.INSTANCE.create();
serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, false);
serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, false);
serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, false);
for (int i = 0; i < 500; i++) {
globalTraceId = UniqueIdBuilder.INSTANCE.create();
serviceASegmentId = UniqueIdBuilder.INSTANCE.create();
serviceBSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceCSegmentId = UniqueIdBuilder.INSTANCE.create();
serviceAMock.mock(streamObserver, globalTraceId, serviceASegmentId, startTimestamp, false);
serviceBMock.mock(streamObserver, globalTraceId, serviceBSegmentId, serviceASegmentId, startTimestamp, false);
serviceCMock.mock(streamObserver, globalTraceId, serviceCSegmentId, serviceBSegmentId, startTimestamp, false);
}
streamObserver.onCompleted();
while (!IS_COMPLETED) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册