提交 6f25240d 编写于 作者: Z zhangxin10

从Git切换到Github

上级 78befc0e
#!/usr/bin/env bash
SW_PREFIX="${SW_SERVER_BIN}/.."
SW_LOG_DIR="${SW_SERVER_BIN}/../log"
SW_CFG_DIR="${SW_SERVER_BIN}/../config"
#设置Java home
if [ "$JAVA_HOME" != "" ]; then
JAVA="$JAVA_HOME/bin/java"
else
JAVA=java
fi
CLASSPATH="$SW_CFG_DIR:$CLASSPATH"
for i in "${SW_SERVER_BIN}"/lib/*.jar
do
CLASSPATH="$i:$CLASSPATH"
done
if $cygwin
then
CLASSPATH=`cygpath -wp "$CLASSPATH"`
fi
echo "CLASSPATH=$CLASSPATH"
\ No newline at end of file
#!/usr/bin/env bash
# use POSTIX interface, symlink is followed automatically
SW_SERVER_BIN="${BASH_SOURCE-$0}"
SW_SERVER_BIN="$(dirname "${SW_SERVER_BIN}")"
SW_SERVER_BIN="$(cd "${SW_SERVER_BIN}"; pwd)"
# 设置Skywalking的目录基本信息
. "SW_SERVER_BIN/swEnv.sh"
#采集服务器的端口
server.port=34000
#最大数据缓存线程数量
buffer.max_thread_number=1
#每个线程最大缓存数量
buffer.per_thread_max_buffer_number=1024
#无数据处理时轮询等待时间(单位:毫秒)
buffer.max_wait_time=5000
#数据冲突时等待时间(单位:毫秒)
buffer.data_conflict_wait_time=10
#数据缓存文件目录
buffer.data_buffer_file_parent_directory=D:/test-data/data/buffer
#缓存数据文件最大长度(单位:byte)
buffer.data_file_max_length=10240
#每次Flush的缓存数据的个数
buffer.flush_number_of_cache=30
#最大持久化的线程数量
persistence.max_thread_number=1
#定位文件时,每次读取偏移量跳过大小
persistence.offset_file_skip_length=2048
#每次读取文件偏移量大小
persistence.offset_file_read_buffer_size=2048
#处理文件完成之后,等待时间(单位:毫秒)
persistence.switch_file_wait_time=5000
#偏移量注册文件的目录
registerpersistence.register_file_parent_directory=D:/test-data/data/offset
#偏移量注册文件名
registerpersistence.register_file_name=offset.txt
#偏移量注册备份文件名
registerpersistence.register_bak_file_name=offset.txt.bak
#偏移量写入文件等待周期(单位:毫秒)
registerpersistence.offset_written_file_wait_cycle=5000
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ai.cloud</groupId>
<artifactId>skywalking-server</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>skywalking-server</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.6</version>
<executions>
<execution>
<id>copy-resources</id>
<phase>pre-integration-test</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.basedir}/installer/config</outputDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>pre-integration-test</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
</execution>
</executions>
<configuration>
<outputDirectory>${project.basedir}/installer/lib</outputDirectory>
<excludeTransitive>false</excludeTransitive>
<stripVersion>true</stripVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.ai.cloud.skywalking.reciever;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThread;
import com.ai.cloud.skywalking.reciever.buffer.DataBufferThreadContainer;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.conf.ConfigInitializer;
import com.ai.cloud.skywalking.reciever.persistance.PersistenceThreadLauncher;
import com.ai.cloud.skywalking.reciever.util.ByteArrayUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Properties;
public class CollectionServer {
static Logger logger = LogManager.getLogger(CollectionServer.class);
private Selector selector;
public CollectionServer() {
}
public void doCollect() throws IOException {
ServerSocketChannel serverSocketChannel = initServerSocketChannel();
DataBufferThread dataBuffer;
while (selector.select() > 0) {
Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
beginToRead(serverSocketChannel, key);
if (key.isReadable()) {
ByteChannel sc = (SocketChannel) key.channel();
ByteBuffer contextLengthBuffer = ByteBuffer.allocate(4);
try {
sc.read(contextLengthBuffer);
int length = ByteArrayUtil.byteArrayToInt(contextLengthBuffer.array(), 0);
if (length > 0) {
ByteBuffer contentBuffer = ByteBuffer.allocate(length);
sc.read(contentBuffer);
dataBuffer = DataBufferThreadContainer.getDataBufferThread();
dataBuffer.doCarry(new String(contentBuffer.array()));
}
contextLengthBuffer.flip();
} catch (IOException e) {
logger.error("The remote client disconnect service", e);
sc.close();
}
}
}
}
}
private void beginToRead(ServerSocketChannel serverSocketChannel, SelectionKey key) throws IOException {
if (key.isAcceptable()) {
SocketChannel sc = serverSocketChannel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
}
private ServerSocketChannel initServerSocketChannel() throws IOException {
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", Config.Server.PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
logger.info("The service is listening on port {}", Config.Server.PORT);
return serverSocketChannel;
}
public static void main(String[] args) throws IOException, IllegalAccessException {
logger.info("To initialize the collect server configuration parameters....");
initializeParam();
logger.info("To launch register persistence thread....");
PersistenceThreadLauncher.doLaunch();
logger.info("To init data buffer thread container...");
DataBufferThreadContainer.init();
logger.info("Starting collection server.....");
new CollectionServer().doCollect();
}
private static void initializeParam() throws IllegalAccessException, IOException {
Properties properties = new Properties();
try {
properties.load(CollectionServer.class.getResourceAsStream("/config.properties"));
ConfigInitializer.initialize(properties, Config.class);
} catch (IllegalAccessException e) {
logger.error("Initialize the collect server configuration failed", e);
throw e;
} catch (IOException e) {
logger.error("Initialize the collect server configuration failed", e);
throw e;
}
}
}
package com.ai.cloud.skywalking.reciever.buffer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.*;
public class DataBufferThread extends Thread {
private Logger logger = LogManager.getLogger(DataBufferThread.class);
private String[] data = new String[PER_THREAD_MAX_BUFFER_NUMBER];
private File file;
private FileOutputStream outputStream;
private AtomicInteger index = new AtomicInteger();
public DataBufferThread() {
try {
file = new File(DATA_BUFFER_FILE_PARENT_DIRECTORY, getFileName());
if (file.exists()) {
file.createNewFile();
}
if (logger.isDebugEnabled()){
logger.debug("Create buffer data file {}.", file.getName());
}
outputStream = new FileOutputStream(file, true);
} catch (FileNotFoundException e) {
logger.error("Data cache file cannot be created or written, please check the file system.", e);
System.exit(-1);
} catch (IOException e) {
logger.error("Data cache file cannot be created or written, please check the file system.", e);
System.exit(-1);
}
}
@Override
public void run() {
boolean isWriteFailure;
while (true) {
boolean bool = false;
int count = 0;
int index = 0;
for (int i = 0; i < data.length; i++, count++) {
if (data[i] == null) {
continue;
}
bool = true;
isWriteFailure = true;
while (isWriteFailure) {
try {
outputStream.write((data[i] + "\n").getBytes());
isWriteFailure = false;
} catch (IOException e) {
logger.error("Write buffer data failed.", e);
try {
Thread.sleep(WRITE_DATA_FAILURE_RETRY_INTERVAL);
} catch (InterruptedException e1) {
logger.error("Failure sleep.", e);
}
}
}
if (index++ > FLUSH_NUMBER_OF_CACHE) {
try {
outputStream.flush();
} catch (IOException e) {
logger.error("Flush buffer data failed.", e);
}
}
data[i] = null;
}
if (file.length() > DATA_FILE_MAX_LENGTH) {
convertFile();
}
if (!bool) {
try {
Thread.sleep(MAX_WAIT_TIME);
} catch (InterruptedException e) {
logger.error("Failure sleep.", e);
}
}
}
}
private String getFileName() {
return System.currentTimeMillis() + "-" + UUID.randomUUID().toString().replaceAll("-", "");
}
private void convertFile() {
String fileName = getFileName();
try {
outputStream.write("EOF\n".getBytes());
outputStream.flush();
outputStream.close();
} catch (IOException e) {
logger.error("Write cache data failed.", e);
}
logger.debug("Begin to switch the data file to {}.", fileName);
try {
file = new File(DATA_BUFFER_FILE_PARENT_DIRECTORY, fileName);
outputStream = new FileOutputStream(file, true);
} catch (IOException e) {
logger.error("Switch data file failed.", e);
}
}
public void doCarry(String s) {
int i = Math.abs(index.getAndIncrement() % data.length);
while (data[i] != null) {
try {
Thread.sleep(DATA_CONFLICT_WAIT_TIME);
} catch (InterruptedException e) {
logger.error("Failure sleep.", e);
}
}
data[i] = s;
}
}
package com.ai.cloud.skywalking.reciever.buffer;
import com.ai.cloud.skywalking.reciever.conf.Config;
import org.apache.commons.io.comparator.NameFileComparator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static com.ai.cloud.skywalking.reciever.conf.Config.Buffer.MAX_THREAD_NUMBER;
public class DataBufferThreadContainer {
private static Logger logger = LogManager.getLogger(DataBufferThreadContainer.class);
private static List<DataBufferThread> buffers = new ArrayList<DataBufferThread>();
private DataBufferThreadContainer() {
}
public static DataBufferThread getDataBufferThread() {
if (buffers.size() == 0) {
throw new RuntimeException("Data buffer thread pool is not init");
}
return buffers.get(ThreadLocalRandom.current().nextInt(buffers.size()));
}
public static void init() {
logger.info("Add EOF flags to the unprocessed data file last time.");
File parentDir = new File(Config.Buffer.DATA_BUFFER_FILE_PARENT_DIRECTORY);
NameFileComparator sizeComparator = new NameFileComparator();
File[] dataFileList = sizeComparator.sort(parentDir.listFiles());
BufferedWriter bufferedWriter;
for (File file : dataFileList) {
try {
logger.info("Add EOF flags to unprocessed data file[{}]", file.getName());
bufferedWriter = new BufferedWriter(new FileWriter(new File(file.getParent(), file.getName()), true));
bufferedWriter.write("EOF\n");
bufferedWriter.flush();
bufferedWriter.close();
} catch (IOException e) {
logger.info("Add EOF flags to the unprocessed data file failed.", e);
}
}
logger.info("Data buffer thread size {} begin to init ", MAX_THREAD_NUMBER);
for (int i = 0; i < MAX_THREAD_NUMBER; i++) {
DataBufferThread dataBufferThread = new DataBufferThread();
dataBufferThread.start();
buffers.add(dataBufferThread);
}
}
}
package com.ai.cloud.skywalking.reciever.conf;
public class Config {
// 采集服务配置类
public static class Server {
// 采集服务器的端口
public static int PORT = 34000;
}
// 数据缓存配置类
public static class Buffer {
// 最大数据缓存线程数量
public static int MAX_THREAD_NUMBER = 1;
//每个线程最大缓存数量
public static int PER_THREAD_MAX_BUFFER_NUMBER = 1024;
// 无数据处理时轮询等待时间(单位:毫秒)
public static long MAX_WAIT_TIME = 5000L;
// 数据冲突时等待时间(单位:毫秒)
public static long DATA_CONFLICT_WAIT_TIME = 10L;
// 数据缓存文件目录
public static String DATA_BUFFER_FILE_PARENT_DIRECTORY = "../data/buffer";
// 缓存数据文件最大长度(单位:byte)
public static int DATA_FILE_MAX_LENGTH = 30 * 1024 * 1024;
// 每次缓存数据写入失败,最大尝试时间
public static long WRITE_DATA_FAILURE_RETRY_INTERVAL = 10 * 60 * 1000L;
//每次Flush的缓存数据的个数
public static int FLUSH_NUMBER_OF_CACHE = 30;
}
public static class Persistence {
// 最大持久化的线程数量
public static int MAX_THREAD_NUMBER = 1;
// 定位文件时,每次读取偏移量跳过大小
public static int OFFSET_FILE_SKIP_LENGTH = 2048;
// 每次读取文件偏移量大小
public static int OFFSET_FILE_READ_BUFFER_SIZE = 2048;
// 处理文件完成之后,等待时间
public static long SWITCH_FILE_WAIT_TIME = 5000L;
}
public static class RegisterPersistence {
// 偏移量注册文件的目录
public static String REGISTER_FILE_PARENT_DIRECTORY = "../data/offset";
// 偏移量注册文件名
public static String REGISTER_FILE_NAME = "offset.txt";
// 偏移量注册备份文件名
public static String REGISTER_BAK_FILE_NAME = "offset.txt.bak";
// 偏移量写入文件等待周期
public static long OFFSET_WRITTEN_FILE_WAIT_CYCLE = 5000L;
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.reciever.conf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.LinkedList;
import java.util.Properties;
public class ConfigInitializer {
public static Logger logger = LogManager.getLogger(ConfigInitializer.class);
public static void initialize(Properties properties, Class<?> rootConfigType) throws IllegalAccessException {
for (Class innerConfiguration : rootConfigType.getClasses()) {
for (Field field : innerConfiguration.getFields()) {
if (9 != field.getModifiers())
continue;
String configKey = (innerConfiguration.getSimpleName() + "." +
field.getName()).toLowerCase();
String value = properties.getProperty(configKey);
if (value != null) {
if (field.getType().equals(int.class))
field.set(null, Integer.valueOf(value));
if (field.getType().equals(String.class))
field.set(null, value);
if (field.getType().equals(long.class))
field.set(null, Long.valueOf(value));
}
logger.debug("{}={}", configKey, value);
}
}
}
public static void initialize2(Properties properties, Class<?> rootConfigType) throws IllegalAccessException {
initNextLevel(properties, rootConfigType, new ConfigDesc());
}
private static void initNextLevel(Properties properties, Class<?> recentConfigType, ConfigDesc parentDesc) throws NumberFormatException, IllegalArgumentException, IllegalAccessException {
for (Field field : recentConfigType.getFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers())) {
String configKey = (parentDesc + "." +
field.getName()).toLowerCase();
String value = properties.getProperty(configKey);
if (value != null) {
if (field.getType().equals(int.class))
field.set(null, Integer.valueOf(value));
if (field.getType().equals(String.class))
field.set(null, value);
if (field.getType().equals(long.class))
field.set(null, Long.valueOf(value));
}
logger.debug("{}={}", configKey, field.get(null));
}
}
for (Class<?> innerConfiguration : recentConfigType.getClasses()) {
parentDesc.append(innerConfiguration.getSimpleName());
initNextLevel(properties, innerConfiguration, parentDesc);
parentDesc.removeLastDesc();
}
}
}
class ConfigDesc {
private LinkedList<String> descs = new LinkedList<String>();
void append(String currentDesc) {
descs.addLast(currentDesc);
}
void removeLastDesc() {
descs.removeLast();
}
@Override
public String toString() {
if (descs.size() == 0) {
return "";
}
StringBuilder ret = new StringBuilder(descs.getFirst());
boolean first = true;
for (String desc : descs) {
if (first) {
first = false;
continue;
}
ret.append(".").append(desc);
}
return ret.toString();
}
}
package com.ai.cloud.skywalking.reciever.constants;
public class ServerConstants {
}
package com.ai.cloud.skywalking.reciever.persistance;
public class FileRegisterEntry {
private String fileName;
private int offset;
private FileRegisterEntryStatus statue;
public FileRegisterEntry() {
}
public FileRegisterEntry(String fileName) {
this.fileName = fileName;
}
public FileRegisterEntry(String fileName, int offset) {
this.fileName = fileName;
this.offset = offset;
}
public FileRegisterEntry(String fileName, int offset, FileRegisterEntryStatus statue) {
this.fileName = fileName;
this.offset = offset;
this.statue = statue;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
public FileRegisterEntryStatus getStatue() {
return statue;
}
public void setStatue(FileRegisterEntryStatus statue) {
this.statue = statue;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FileRegisterEntry)) return false;
FileRegisterEntry that = (FileRegisterEntry) o;
return !(getFileName() != null ? !getFileName().equals(that.getFileName()) : that.getFileName() != null);
}
@Override
public int hashCode() {
return getFileName() != null ? getFileName().hashCode() : 0;
}
@Override
public String toString() {
return fileName + '\t' + offset + "\t" + statue;
}
public enum FileRegisterEntryStatus {
REGISTER, UNREGISTER;
}
}
package com.ai.cloud.skywalking.reciever.persistance;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static com.ai.cloud.skywalking.reciever.conf.Config.RegisterPersistence.REGISTER_FILE_NAME;
import static com.ai.cloud.skywalking.reciever.conf.Config.RegisterPersistence.REGISTER_FILE_PARENT_DIRECTORY;
public class MemoryRegister {
private Logger logger = LogManager.getLogger(MemoryRegister.class);
private Map<String, FileRegisterEntry> entries = new ConcurrentHashMap<String, FileRegisterEntry>();
private File file;
private static MemoryRegister memoryRegister = new MemoryRegister();
public static MemoryRegister instance() {
return memoryRegister;
}
public synchronized void doRegisterStatus(FileRegisterEntry fileRegisterEntry) {
if (logger.isDebugEnabled()) {
logger.debug("Register entry[{}] into the memory register", fileRegisterEntry.getFileName());
}
entries.put(fileRegisterEntry.getFileName(), fileRegisterEntry);
}
public void unRegister(String fileName) {
if (logger.isDebugEnabled()) {
logger.debug("Unregister[{}] from the memory register", fileName);
}
entries.remove(fileName);
}
public synchronized boolean isRegister(String fileName) {
if (entries.containsKey(fileName)) {
if (entries.get(fileName).getStatue() == FileRegisterEntry.FileRegisterEntryStatus.REGISTER) {
if (logger.isDebugEnabled()) {
logger.debug("Entry[{}] has been register", fileName);
}
return true;
}
}
return false;
}
public Collection<FileRegisterEntry> getEntries() {
return entries.values();
}
public int getOffSet(String fileName) {
if (entries.containsKey(fileName)) {
return entries.get(fileName).getOffset();
}
return -1;
}
private void checkOffSetExists() {
file = new File(REGISTER_FILE_PARENT_DIRECTORY, REGISTER_FILE_NAME);
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
logger.error("Create offset filed failed", e);
}
}
}
private MemoryRegister() {
// 读取offset文件
checkOffSetExists();
BufferedReader reader;
// 在处理数据之前需要初始化处理文件的处理状态
try {
reader = new BufferedReader(new FileReader(file));
String offsetData;
while ((offsetData = reader.readLine()) != null && !"EOF".equals(offsetData)) {
String[] ss = offsetData.split("\t");
entries.put(ss[0], new FileRegisterEntry(ss[0], Integer.valueOf(ss[1]), FileRegisterEntry.FileRegisterEntryStatus.UNREGISTER));
}
} catch (FileNotFoundException e) {
logger.error("The offset file does not exist.", e);
checkOffSetExists();
} catch (IOException e) {
logger.error("Read data from offset file failed.", e);
}
}
}
package com.ai.cloud.skywalking.reciever.persistance;
import com.ai.cloud.skywalking.reciever.conf.Config;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.comparator.NameFileComparator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import static com.ai.cloud.skywalking.reciever.conf.Config.Persistence.*;
public class PersistenceThread extends Thread {
private Logger logger = LogManager.getLogger(PersistenceThread.class);
@Override
public void run() {
while (true) {
File file1 = getDataFiles();
if (file1 == null) {
try {
Thread.sleep(SWITCH_FILE_WAIT_TIME);
} catch (InterruptedException e) {
logger.error("Failure sleep", e);
}
continue;
}
try {
BufferedReader bufferedReader = new BufferedReader(new FileReader(file1));
int offset = moveOffSet(file1, bufferedReader);
if (logger.isDebugEnabled()) {
logger.debug("Get file[{}] offset [{}]", file1.getName(), offset);
}
char[] chars = new char[OFFSET_FILE_READ_BUFFER_SIZE];
StringBuffer data = new StringBuffer();
boolean bool = true;
while (bool) {
if (bufferedReader.read(chars, 0, chars.length) == -1) {
MemoryRegister.instance().doRegisterStatus(new FileRegisterEntry(file1.getName(), offset,
FileRegisterEntry.FileRegisterEntryStatus.UNREGISTER));
break;
}
for (int i = 0; i < chars.length; i++) {
if (chars[i] != '\0'){
offset++;
}
if (chars[i] != '\n') {
data.append(chars[i]);
continue;
}
// HBase
System.out.println(data);
if ("EOF".equals(data.toString())) {
bufferedReader.close();
logger.info("Data in file[{}] has been successfully processed", file1.getName());
boolean deleteSuccess = false;
while(!deleteSuccess) {
deleteSuccess = FileUtils.deleteQuietly(new File(file1.getParent(),file1.getName()));
}
logger.info("Delete file[{}] {}", file1.getName(), (deleteSuccess ? "success" : "failed"));
MemoryRegister.instance().unRegister(file1.getName());
bool = false;
break;
}
data.delete(0, data.length());
}
}
} catch (FileNotFoundException e) {
logger.error("The data file could not be found", e);
} catch (IOException e) {
logger.error("The data file could not be found", e);
}
try {
Thread.sleep(SWITCH_FILE_WAIT_TIME);
} catch (InterruptedException e) {
logger.error("Failure sleep.", e);
}
}
}
private int moveOffSet(File file1, BufferedReader bufferedReader) throws IOException {
int offset = MemoryRegister.instance().getOffSet(file1.getName());
if (-1 == offset || offset == 0) {
// 以前该文件没有被任何人处理过,需要重新注册
MemoryRegister.instance().doRegisterStatus(new FileRegisterEntry(file1.getName(), 0,
FileRegisterEntry.FileRegisterEntryStatus.REGISTER));
offset = 0;
} else {
char[] cha = new char[OFFSET_FILE_SKIP_LENGTH];
int length = 0;
while (length + OFFSET_FILE_SKIP_LENGTH < offset) {
length += OFFSET_FILE_SKIP_LENGTH;
bufferedReader.read(cha);
}
bufferedReader.read(cha, 0, Math.abs(offset - length));
cha = null;
}
return offset;
}
private File getDataFiles() {
File file1 = null;
File parentDir = new File(Config.Buffer.DATA_BUFFER_FILE_PARENT_DIRECTORY);
NameFileComparator sizeComparator = new NameFileComparator();
File[] dataFileList = sizeComparator.sort(parentDir.listFiles());
for (File file : dataFileList) {
if (MemoryRegister.instance().isRegister(file.getName())) {
if (logger.isDebugEnabled())
logger.debug("The file [{}] is being used by another thread ", file);
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Begin to deal data file [{}]", file.getName());
}
file1 = file;
break;
}
return file1;
}
}
package com.ai.cloud.skywalking.reciever.persistance;
import com.ai.cloud.skywalking.reciever.conf.Config;
public class PersistenceThreadLauncher {
public static void doLaunch() {
new RegisterPersistenceThread().start();
for (int i = 0; i < Config.Persistence.MAX_THREAD_NUMBER; i++) {
new PersistenceThread().start();
}
}
}
package com.ai.cloud.skywalking.reciever.persistance;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collection;
import static com.ai.cloud.skywalking.reciever.conf.Config.RegisterPersistence.*;
public class RegisterPersistenceThread extends Thread {
private Logger logger = LogManager.getLogger(RegisterPersistenceThread.class);
private BufferedWriter writer;
public RegisterPersistenceThread() {
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(OFFSET_WRITTEN_FILE_WAIT_CYCLE);
} catch (InterruptedException e) {
logger.error("Sleep failure", e);
}
//
File file = new File(REGISTER_FILE_PARENT_DIRECTORY, REGISTER_FILE_NAME);
File bakFile = new File(REGISTER_FILE_PARENT_DIRECTORY, REGISTER_BAK_FILE_NAME);
try {
FileUtils.copyFile(file, bakFile);
} catch (IOException e) {
logger.error("Sleep failure", e);
}
Collection<FileRegisterEntry> fileRegisterEntries = MemoryRegister.instance().getEntries();
logger.info("file Register Entries size [{}]", fileRegisterEntries.size());
BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter(file));
} catch (IOException e) {
logger.error("Write The offset file anomalies.");
System.exit(-1);
}
//
for (FileRegisterEntry fileRegisterEntry : fileRegisterEntries) {
try {
writer.write(fileRegisterEntry.toString() + "\n");
} catch (IOException e) {
e.printStackTrace();
}
}
try {
writer.write("EOF\n");
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.ai.cloud.skywalking.reciever.util;
public class ByteArrayUtil {
private ByteArrayUtil() {
}
public static int byteArrayToInt(byte[] b, int offset) {
int value = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
value += (b[i + offset] & 0x000000FF) << shift;
}
return value;
}
}
#采集服务器的端口
server.port=34000
#最大数据缓存线程数量
buffer.max_thread_number=1
#每个线程最大缓存数量
buffer.per_thread_max_buffer_number=1024
#无数据处理时轮询等待时间(单位:毫秒)
buffer.max_wait_time=5000
#数据冲突时等待时间(单位:毫秒)
buffer.data_conflict_wait_time=10
#数据缓存文件目录
buffer.data_buffer_file_parent_directory=D:/test-data/data/buffer
#缓存数据文件最大长度(单位:byte)
buffer.data_file_max_length=10240
#每次Flush的缓存数据的个数
buffer.flush_number_of_cache=30
#最大持久化的线程数量
persistence.max_thread_number=1
#定位文件时,每次读取偏移量跳过大小
persistence.offset_file_skip_length=2048
#每次读取文件偏移量大小
persistence.offset_file_read_buffer_size=2048
#处理文件完成之后,等待时间(单位:毫秒)
persistence.switch_file_wait_time=5000
#偏移量注册文件的目录
registerpersistence.register_file_parent_directory=D:/test-data/data/offset
#偏移量注册文件名
registerpersistence.register_file_name=offset.txt
#偏移量注册备份文件名
registerpersistence.register_bak_file_name=offset.txt.bak
#偏移量写入文件等待周期(单位:毫秒)
registerpersistence.offset_written_file_wait_cycle=5000
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
package com.ai.cloud.skywalking.reciever.thread;
import com.ai.cloud.skywalking.reciever.persistance.PersistenceThread;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
/**
* Created by astraea on 2015/11/5.
*/
public class PersistenceThreadTest {
@Test
public void testDealDataFile() throws InterruptedException {
new PersistenceThread().start();
Thread.sleep(500000L);
}
@Test
public void test() {
System.out.println(Integer.valueOf(30 * 1024* 1024));
}
@Test
public void testFile() throws IOException {
File file = new File("D:\\test-data\\data\\buffer", "1446801421453-e10f3cc6279d48bebbd03fc3938ad665");
FileWriter writer = new FileWriter(file,true);
writer.write("ssss");
writer.flush();
writer.close();
System.out.println(FileUtils.deleteQuietly(file));
}
}
\ No newline at end of file
#采集服务器的端口
server.port=34000
#最大数据缓存线程数量
buffer.max_thread_number=1
#每个线程最大缓存数量
buffer.per_thread_max_buffer_number=1024
#无数据处理时轮询等待时间(单位:毫秒)
buffer.max_wait_time=5000
#数据冲突时等待时间(单位:毫秒)
buffer.data_conflict_wait_time=10
#数据缓存文件目录
buffer.data_buffer_file_parent_directory=D:/test-data/data/buffer
#缓存数据文件最大长度(单位:byte)
buffer.data_file_max_length=10240
#每次Flush的缓存数据的个数
buffer.flush_number_of_cache=30
#最大持久化的线程数量
persistence.max_thread_number=1
#定位文件时,每次读取偏移量跳过大小
persistence.offset_file_skip_length=2048
#每次读取文件偏移量大小
persistence.offset_file_read_buffer_size=2048
#处理文件完成之后,等待时间(单位:毫秒)
persistence.switch_file_wait_time=5000
#偏移量注册文件的目录
registerpersistence.register_file_parent_directory=D:/test-data/data/offset
#偏移量注册文件名
registerpersistence.register_file_name=offset.txt
#偏移量注册备份文件名
registerpersistence.register_bak_file_name=offset.txt.bak
#偏移量写入文件等待周期(单位:毫秒)
registerpersistence.offset_written_file_wait_cycle=5000
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="debug">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="debug">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
\ No newline at end of file
#Generated by Maven
#Sat Nov 07 10:31:28 CST 2015
version=1.0-SNAPSHOT
groupId=com.ai.cloud
artifactId=skywalking-server
com\ai\cloud\skywalking\reciever\util\ByteArrayUtil.class
com\ai\cloud\skywalking\reciever\conf\Config.class
com\ai\cloud\skywalking\reciever\persistance\RegisterPersistenceThread.class
com\ai\cloud\skywalking\reciever\buffer\DataBufferThreadContainer.class
com\ai\cloud\skywalking\reciever\persistance\PersistenceThread.class
com\ai\cloud\skywalking\reciever\persistance\FileRegisterEntry$FileRegisterEntryStatus.class
com\ai\cloud\skywalking\reciever\CollectionServer.class
com\ai\cloud\skywalking\reciever\buffer\DataBufferThread.class
com\ai\cloud\skywalking\reciever\conf\Config$Persistence.class
com\ai\cloud\skywalking\reciever\constants\ServerConstants.class
com\ai\cloud\skywalking\reciever\conf\ConfigDesc.class
com\ai\cloud\skywalking\reciever\conf\Config$RegisterPersistence.class
com\ai\cloud\skywalking\reciever\persistance\FileRegisterEntry.class
com\ai\cloud\skywalking\reciever\conf\ConfigInitializer.class
com\ai\cloud\skywalking\reciever\persistance\PersistenceThreadLauncher.class
com\ai\cloud\skywalking\reciever\persistance\MemoryRegister.class
com\ai\cloud\skywalking\reciever\conf\Config$Server.class
com\ai\cloud\skywalking\reciever\conf\Config$Buffer.class
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\persistance\PersistenceThread.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\conf\ConfigInitializer.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\util\ByteArrayUtil.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\constants\ServerConstants.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\CollectionServer.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\persistance\FileRegisterEntry.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\buffer\DataBufferThread.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\conf\Config.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\persistance\RegisterPersistenceThread.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\buffer\DataBufferThreadContainer.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\persistance\MemoryRegister.java
D:\iworkbench\PaaS-PerformanceMonitor-DEV\skywalking-server\src\main\java\com\ai\cloud\skywalking\reciever\persistance\PersistenceThreadLauncher.java
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册