未验证 提交 069b99bd 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Add file change detection mechanism (#4475)

* Add file change detection mechanism

* Fix format.

* Make file content changed monitor easier.

* Add secondary round change and check the result.

* Update to support multiple files change monitoring.
上级 9c974b80
......@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
......@@ -35,8 +36,6 @@ import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TTL = Time To Live
......@@ -46,11 +45,10 @@ import org.slf4j.LoggerFactory;
* override TTL, which could be more suitable for the implementation. No matter which TTL configurations are set, they
* are all driven by this timer.
*/
@Slf4j
public enum DataTTLKeeperTimer {
INSTANCE;
private static final Logger logger = LoggerFactory.getLogger(DataTTLKeeperTimer.class);
private ModuleManager moduleManager;
private ClusterNodesQuery clusterNodesQuery;
......@@ -62,7 +60,7 @@ public enum DataTTLKeeperTimer {
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::delete,
t -> logger.error("Remove data in background failure.", t)
t -> log.error("Remove data in background failure.", t)
), moduleConfig
.getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
}
......@@ -74,11 +72,11 @@ public enum DataTTLKeeperTimer {
private void delete() {
List<RemoteInstance> remoteInstances = clusterNodesQuery.queryRemoteNodes();
if (CollectionUtils.isNotEmpty(remoteInstances) && !remoteInstances.get(0).getAddress().isSelf()) {
logger.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
log.info("The selected first getAddress is {}. Skip.", remoteInstances.get(0).toString());
return;
}
logger.info("Beginning to remove expired metrics from the storage.");
log.info("Beginning to remove expired metrics from the storage.");
IModelGetter modelGetter = moduleManager.find(CoreModule.NAME).provider().getService(IModelGetter.class);
List<Model> models = modelGetter.getModels();
models.forEach(model -> {
......@@ -95,8 +93,8 @@ public enum DataTTLKeeperTimer {
.getService(IHistoryDeleteDAO.class)
.deleteHistory(model, Metrics.TIME_BUCKET);
} catch (IOException e) {
logger.warn("History of {} delete failure", model.getName());
logger.error(e.getMessage(), e);
log.warn("History of {} delete failure", model.getName());
log.error(e.getMessage(), e);
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.util;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* MultipleFilesChangeMonitor provides the capability to detect file or multiple files changed. It provide second level
* change detection and feedback mechanism.
*
* Due to memory cost, this monitor mechanism is not suitable for small files and usually being changed on the runtime
* by user manually or 3rd party tool. Typical, these files are config information or authentication files.
*/
@Slf4j
public class MultipleFilesChangeMonitor {
/**
* The backend scheduler to trigger all file monitoring.
*/
private static ScheduledFuture<?> FILE_MONITOR_TASK_SCHEDULER;
private static ReentrantLock SCHEDULER_CHANGE_LOCK = new ReentrantLock();
/**
* The list contains all monitors.
*/
private static List<MultipleFilesChangeMonitor> MONITOR_INSTANCES = new ArrayList<>();
/**
* The timestamp when last time do status checked.
*/
private long lastCheckTimestamp = 0;
/**
* The period of watching thread checking the file status. Unit is the second.
*/
private final long watchingPeriodInSec;
private List<WatchedFile> watchedFiles;
private FilesChangedNotifier notifier;
/**
* Create a new monitor for the given files
*
* @param watchingPeriodInSec The check period.
* @param notifier to accept the file changed notification.
* @param files to be monitored.
*/
public MultipleFilesChangeMonitor(long watchingPeriodInSec,
FilesChangedNotifier notifier,
String... files) {
watchedFiles = new ArrayList<>();
this.watchingPeriodInSec = watchingPeriodInSec;
this.notifier = notifier;
for (final String file : files) {
WatchedFile monitor = new WatchedFile(file);
watchedFiles.add(monitor);
}
}
/**
* Check file changed status, if so, send the notification.
*/
private void checkAndNotify() {
if (System.currentTimeMillis() - lastCheckTimestamp < watchingPeriodInSec * 1000) {
// Don't reach the period threshold, ignore this check.
return;
}
boolean isChanged = false;
for (final WatchedFile watchedFile : watchedFiles) {
isChanged = isChanged || watchedFile.detectContentChanged();
}
if (isChanged) {
List<byte[]> contents = new ArrayList<>(watchedFiles.size());
watchedFiles.forEach(file -> {
contents.add(file.fileContent);
});
notifier.filesChanged(contents);
}
}
/**
* One file changed will cause all related files loaded from the disk again with lastModifiedTimestamp updated.
*/
public static void scanChanges() {
MONITOR_INSTANCES.forEach(group -> {
try {
group.checkAndNotify();
} catch (Throwable t) {
log.error("Files change detection failure, gourp = ", t);
}
});
}
/**
* Start the change monitoring.
*/
public void start() {
SCHEDULER_CHANGE_LOCK.lock();
try {
if (FILE_MONITOR_TASK_SCHEDULER == null) {
FILE_MONITOR_TASK_SCHEDULER = Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
MultipleFilesChangeMonitor::scanChanges, 1, 200,
TimeUnit.MILLISECONDS
);
}
if (MONITOR_INSTANCES.contains(this)) {
throw new IllegalStateException("This FileChangeMonitor has been started.");
}
this.checkAndNotify();
MONITOR_INSTANCES.add(this);
} finally {
SCHEDULER_CHANGE_LOCK.unlock();
}
}
/**
* Stop the change monitoring.
*/
public void stop() {
SCHEDULER_CHANGE_LOCK.lock();
try {
MONITOR_INSTANCES.remove(this);
} finally {
SCHEDULER_CHANGE_LOCK.unlock();
}
}
@Override
public String toString() {
return "MultipleFilesChangeMonitor{" +
"watchedFiles=" + watchedFiles +
'}';
}
/**
* The callback when files changed.
*/
public interface FilesChangedNotifier {
/**
* Notify the new content by providing the file input stream for all files in this group.
*
* @param readableContents include the new contents. NULL if the file doesn't exist.
*/
void filesChanged(List<byte[]> readableContents);
}
/**
* WatchedFile represents a file change detector. It could detect the file changed based on modified time and file
* content at the binary level. It load the file content into the memory as cache to do the comparison.
*/
@RequiredArgsConstructor
@Slf4j
private static class WatchedFile {
/**
* The absolute path of the monitored file.
*/
private final String filePath;
/**
* The last modify time of the {@link #filePath}
*/
private long lastModifiedTimestamp = 0;
/**
* File content at the latest status.
*/
private byte[] fileContent;
/**
* Detect the file content change, if yes, reload the file content into the memory as cached data.
*
* @return true if file content changed.
*/
boolean detectContentChanged() {
File targetFile = new File(filePath);
if (!targetFile.exists()) {
if (lastModifiedTimestamp == 0) {
//File doesn't exist before, no change detected.
return false;
} else {
// File has been deleted. Reset the modified timestamp.
lastModifiedTimestamp = 0;
return true;
}
} else {
long lastModified = targetFile.lastModified();
if (lastModified != lastModifiedTimestamp) {
// File modified timestamp changed. Need to read the file content.
try (FileInputStream fileInputStream = new FileInputStream(targetFile)) {
byte[] b = new byte[1024];
ByteArrayOutputStream os = new ByteArrayOutputStream();
int c;
while ((c = fileInputStream.read(b)) != -1) {
os.write(b, 0, c);
}
byte[] newContent = os.toByteArray();
if (!Arrays.equals(newContent, fileContent)) {
fileContent = newContent;
return true;
} else {
return false;
}
} catch (FileNotFoundException e) {
log.error("The existed file turns to missing, watch file=" + filePath, e);
} catch (IOException e) {
log.error("Read file failure, watch file=" + filePath, e);
} finally {
lastModifiedTimestamp = lastModified;
}
}
return false;
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.util;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.List;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class MultipleFilesChangeMonitorTest {
private static String FILE_NAME = "FileChangeMonitorTest.tmp";
@Test
public void test() throws InterruptedException, IOException {
StringBuilder content = new StringBuilder();
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
1, new MultipleFilesChangeMonitor.FilesChangedNotifier() {
@Override
public void filesChanged(final List<byte[]> readableContents) {
Assert.assertEquals(2, readableContents.size());
Assert.assertNull(readableContents.get(1));
try {
content.delete(0, content.length());
content.append(new String(readableContents.get(0), 0, readableContents.get(0).length, "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}, FILE_NAME, "XXXX_NOT_EXIST.SW");
monitor.start();
File file = new File(FILE_NAME);
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file));
bos.write("test context".getBytes(Charset.forName("UTF-8")));
bos.flush();
bos.close();
int countDown = 40;
boolean notified = false;
boolean notified2 = false;
while (countDown-- > 0) {
if ("test context".equals(content.toString())) {
file = new File(FILE_NAME);
bos = new BufferedOutputStream(new FileOutputStream(file));
bos.write("test context again".getBytes(Charset.forName("UTF-8")));
bos.flush();
bos.close();
notified = true;
} else if ("test context again".equals(content.toString())) {
notified2 = true;
break;
}
Thread.sleep(500);
}
Assert.assertTrue(notified);
Assert.assertTrue(notified2);
}
@BeforeClass
@AfterClass
public static void cleanup() {
File file = new File(FILE_NAME);
if (file.exists() && file.isFile()) {
file.delete();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册