提交 aeeb221f 编写于 作者: Z zhangxin10

1. 新增HBase配置

2. 新增HBase接口
上级 aa3c89ce
......@@ -36,6 +36,11 @@
<artifactId>netty-all</artifactId>
<version>4.0.33.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
......@@ -126,6 +131,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -65,7 +65,17 @@ public class Config {
// 偏移量写入文件等待周期
public static long OFFSET_WRITTEN_FILE_WAIT_CYCLE = 5000L;
}
public static class HBaseConfig {
//
public static String TABLE_NAME = "sw-call-chain";
//
public static String FAMILY_COLUMN_NAME = "call-chain";
public static String ZK_HOSTNAME;
public static String CLIENT_PORT;
}
}
\ No newline at end of file
package com.ai.cloud.skywalking.reciever.hbase;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.conf.ConfigInitializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;
public class HBaseOperator {
private static Logger logger = LogManager.getLogger(HBaseOperator.class);
private static Configuration configuration = null;
private static Connection connection;
private static void initHBaseClient() throws IOException {
System.setProperty("hadoop.home.dir", "D:/hadoop/hadoop-2.4.1");
if (configuration == null) {
configuration = HBaseConfiguration.create();
if (Config.HBaseConfig.ZK_HOSTNAME == null || "".equals(Config.HBaseConfig.ZK_HOSTNAME)) {
logger.error("Miss HBase ZK quorum Configuration", new IllegalArgumentException("Miss HBase ZK quorum Configuration"));
System.exit(-1);
}
configuration.set("hbase.zookeeper.quorum", Config.HBaseConfig.ZK_HOSTNAME);
configuration.set("hbase.zookeeper.property.clientPort", Config.HBaseConfig.CLIENT_PORT);
connection = ConnectionFactory.createConnection(configuration);
}
}
private static void createTable(String tableName) {
try {
initHBaseClient();
Admin admin = connection.getAdmin();
if (!admin.isTableAvailable(TableName.valueOf(tableName))) {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
tableDesc.addFamily(new HColumnDescriptor(Config.HBaseConfig.FAMILY_COLUMN_NAME));
admin.createTable(tableDesc);
logger.info("Create table [{}] ok!", tableName);
}
} catch (IOException e) {
logger.error("Create table[{}] failed", tableName, e);
}
}
public static void insert(String rowKey, String qualifier, String value) {
insert(Config.HBaseConfig.TABLE_NAME, rowKey, qualifier, value);
}
public static void insert(String tableName, String rowKey, String qualifier, String value) {
try {
createTable(tableName);
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(Config.HBaseConfig.FAMILY_COLUMN_NAME), Bytes.toBytes(qualifier), Bytes
.toBytes(value));
table.put(put);
if (logger.isDebugEnabled()) {
logger.debug("Insert data[RowKey:{}] success.", rowKey);
}
} catch (IOException e) {
logger.error("Insert the data error.RowKey:[{}],Qualifier[{}],value[{}]", rowKey, qualifier, value, e);
}
}
public static void main(String[] args) throws IllegalAccessException, IOException {
Properties config = new Properties();
config.load(HBaseOperator.class.getResourceAsStream("/config.properties"));
ConfigInitializer.initialize(config, Config.class);
HBaseOperator.createTable("test3");
}
}
package com.ai.cloud.skywalking.reciever.model;
import java.util.Date;
public class BuriedPointEntry {
private String traceId;
private String parentLevel;
private int levelId;
private String viewPointId;
private Date startDate;
private long cost;
private String address;
private byte statusCode = 0;
private String exceptionStack;
private char spanType;
private boolean isReceiver = false;
private String businessKey;
private String processNo;
public String getTraceId() {
return traceId;
}
public String getParentLevel() {
return parentLevel;
}
public int getLevelId() {
return levelId;
}
public String getViewPointId() {
return viewPointId;
}
public Date getStartDate() {
return startDate;
}
public long getCost() {
return cost;
}
public String getAddress() {
return address;
}
public byte getStatusCode() {
return statusCode;
}
public String getExceptionStack() {
return exceptionStack;
}
public char getSpanType() {
return spanType;
}
public boolean isReceiver() {
return isReceiver;
}
public String getBusinessKey() {
return businessKey;
}
public String getProcessNo() {
return processNo;
}
public static BuriedPointEntry convert(String str) {
BuriedPointEntry result = new BuriedPointEntry();
String[] fieldValues = str.split("-");
result.traceId = fieldValues[0];
result.parentLevel = fieldValues[1];
result.levelId = Integer.valueOf(fieldValues[2]);
result.viewPointId = fieldValues[3];
result.startDate = new Date(Long.valueOf(fieldValues[4]));
result.cost = Long.getLong(fieldValues[5]);
result.address = fieldValues[6];
result.exceptionStack = fieldValues[7];
result.spanType = fieldValues[8].charAt(0);
result.isReceiver = Boolean.getBoolean(fieldValues[9]);
result.businessKey = fieldValues[10];
result.processNo = fieldValues[11];
return result;
}
}
package com.ai.cloud.skywalking.reciever.persistance;
import com.ai.cloud.skywalking.reciever.conf.Config;
import com.ai.cloud.skywalking.reciever.hbase.HBaseOperator;
import com.ai.cloud.skywalking.reciever.model.BuriedPointEntry;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.comparator.NameFileComparator;
import org.apache.logging.log4j.LogManager;
......@@ -21,6 +23,7 @@ public class PersistenceThread extends Thread {
BufferedReader bufferedReader;
int offset;
StringBuffer data;
String[] buriedPointData;
while (true) {
file1 = getDataFiles();
if (file1 == null) {
......@@ -54,8 +57,12 @@ public class PersistenceThread extends Thread {
data.append(chars[i]);
continue;
}
// HBase
//System.out.println(data);
buriedPointData = data.toString().split(";");
for (String buriedPoint : buriedPointData) {
BuriedPointEntry entry = BuriedPointEntry.convert(buriedPoint);
HBaseOperator.insert(entry.getTraceId(), entry.getParentLevel() + "." + entry.getLevelId(), buriedPoint);
}
if ("EOF".equals(data.toString())) {
bufferedReader.close();
......@@ -112,9 +119,9 @@ public class PersistenceThread extends Thread {
NameFileComparator sizeComparator = new NameFileComparator();
File[] dataFileList = sizeComparator.sort(parentDir.listFiles());
for (File file : dataFileList) {
if(file.getName().startsWith(".")){
continue;
}
if (file.getName().startsWith(".")) {
continue;
}
if (MemoryRegister.instance().isRegister(file.getName())) {
if (logger.isDebugEnabled())
logger.debug("The file [{}] is being used by another thread ", file);
......
......@@ -34,4 +34,13 @@ registerpersistence.register_file_name=offset.txt
#偏移量注册备份文件名
registerpersistence.register_bak_file_name=offset.txt.bak
#偏移量写入文件等待周期(单位:毫秒)
registerpersistence.offset_written_file_wait_cycle=5000
\ No newline at end of file
registerpersistence.offset_written_file_wait_cycle=5000
#hbase表名
hbaseconfig.table_name=sw-call-chain
#hbase列簇名字
hbaseconfig.family_column_name=call-chain
#hbase zk quorum
hbaseconfig.zk_hostname=10.1.235.197,10.1.235.198,10.1.235.199
#hbase zk port
hbaseconfig.client_port=29181
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册