Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
2042709b
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2042709b
编写于
7月 21, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Storage framework, support auto create elastic search or h2 table
上级
706874c9
变更
32
隐藏空白更改
内联
并排
Showing
32 changed file
with
594 addition
and
27 deletion
+594
-27
apm-collector/apm-collector-agentstream/pom.xml
apm-collector/apm-collector-agentstream/pom.xml
+5
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java
...llector/agentstream/grpc/AgentStreamGRPCModuleDefine.java
+5
-1
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java
.../agentstream/grpc/handler/TraceSegmentServiceHandler.java
+32
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/Const.java
...rg/skywalking/apm/collector/agentstream/worker/Const.java
+13
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/NodeComponentTable.java
...collector/agentstream/worker/node/NodeComponentTable.java
+10
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java
...stream/worker/node/define/NodeComponentEsTableDefine.java
+32
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java
...stream/worker/node/define/NodeComponentH2TableDefine.java
+20
-0
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
...stream/src/main/resources/META-INF/defines/storage.define
+2
-0
apm-collector/apm-collector-boot/src/main/resources/application.yml
...tor/apm-collector-boot/src/main/resources/application.yml
+32
-0
apm-collector/apm-collector-client/pom.xml
apm-collector/apm-collector-client/pom.xml
+1
-1
apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
...m/collector/client/elasticsearch/ElasticSearchClient.java
+27
-2
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Writer.java
...a/org/skywalking/apm/collector/core/framework/Writer.java
+0
-7
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/ColumnDefine.java
...g/skywalking/apm/collector/core/storage/ColumnDefine.java
+22
-0
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Storage.java
...va/org/skywalking/apm/collector/core/storage/Storage.java
+0
-8
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefineLoader.java
...lking/apm/collector/core/storage/StorageDefineLoader.java
+30
-0
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefinitionFile.java
...ing/apm/collector/core/storage/StorageDefinitionFile.java
+12
-0
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstallException.java
...g/apm/collector/core/storage/StorageInstallException.java
+15
-0
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstaller.java
...ywalking/apm/collector/core/storage/StorageInstaller.java
+37
-0
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/TableDefine.java
...rg/skywalking/apm/collector/core/storage/TableDefine.java
+31
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleDefine.java
...skywalking/apm/collector/storage/StorageModuleDefine.java
+13
-5
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleInstaller.java
...walking/apm/collector/storage/StorageModuleInstaller.java
+1
-1
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/ElasticSearchStorageException.java
.../storage/elasticsearch/ElasticSearchStorageException.java
+16
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfig.java
...tor/storage/elasticsearch/StorageElasticSearchConfig.java
+1
-1
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfigParser.java
...orage/elasticsearch/StorageElasticSearchConfigParser.java
+1
-1
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchModuleDefine.java
...orage/elasticsearch/StorageElasticSearchModuleDefine.java
+6
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchColumnDefine.java
...orage/elasticsearch/define/ElasticSearchColumnDefine.java
+16
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchStorageInstaller.java
...e/elasticsearch/define/ElasticSearchStorageInstaller.java
+97
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchTableDefine.java
...torage/elasticsearch/define/ElasticSearchTableDefine.java
+23
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java
...lking/apm/collector/storage/h2/StorageH2ModuleDefine.java
+6
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2ColumnDefine.java
...lking/apm/collector/storage/h2/define/H2ColumnDefine.java
+17
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2StorageInstaller.java
...g/apm/collector/storage/h2/define/H2StorageInstaller.java
+58
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2TableDefine.java
...alking/apm/collector/storage/h2/define/H2TableDefine.java
+13
-0
未找到文件。
apm-collector/apm-collector-agentstream/pom.xml
浏览文件 @
2042709b
...
...
@@ -28,5 +28,10 @@
<artifactId>
apm-collector-server
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<groupId>
org.skywalking
</groupId>
<artifactId>
apm-collector-storage
</artifactId>
<version>
${project.version}
</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java
浏览文件 @
2042709b
package
org.skywalking.apm.collector.agentstream.grpc
;
import
java.util.LinkedList
;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine
;
import
org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine
;
import
org.skywalking.apm.collector.agentstream.grpc.handler.TraceSegmentServiceHandler
;
import
org.skywalking.apm.collector.core.cluster.ClusterDataListener
;
import
org.skywalking.apm.collector.core.framework.Handler
;
import
org.skywalking.apm.collector.core.module.ModuleConfigParser
;
...
...
@@ -42,6 +44,8 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
}
@Override
public
List
<
Handler
>
handlerList
()
{
return
null
;
List
<
Handler
>
handlers
=
new
LinkedList
<>();
handlers
.
add
(
new
TraceSegmentServiceHandler
());
return
handlers
;
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/TraceSegmentServiceHandler.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.agentstream.grpc.handler
;
import
io.grpc.stub.StreamObserver
;
import
org.skywalking.apm.collector.server.grpc.GRPCHandler
;
import
org.skywalking.apm.network.proto.Downstream
;
import
org.skywalking.apm.network.proto.TraceSegmentServiceGrpc
;
import
org.skywalking.apm.network.proto.UpstreamSegment
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
TraceSegmentServiceHandler
extends
TraceSegmentServiceGrpc
.
TraceSegmentServiceImplBase
implements
GRPCHandler
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
TraceSegmentServiceHandler
.
class
);
@Override
public
StreamObserver
<
UpstreamSegment
>
collect
(
StreamObserver
<
Downstream
>
responseObserver
)
{
return
new
StreamObserver
<
UpstreamSegment
>()
{
@Override
public
void
onNext
(
UpstreamSegment
segment
)
{
}
@Override
public
void
onError
(
Throwable
throwable
)
{
logger
.
error
(
throwable
.
getMessage
(),
throwable
);
}
@Override
public
void
onCompleted
()
{
responseObserver
.
onCompleted
();
}
};
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/Const.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.agentstream.worker
;
/**
* @author pengys5
*/
public
class
Const
{
public
static
final
String
ID_SPLIT
=
"..-.."
;
public
static
final
String
IDS_SPLIT
=
"\\.\\.-\\.\\."
;
public
static
final
String
PEERS_FRONT_SPLIT
=
"["
;
public
static
final
String
PEERS_BEHIND_SPLIT
=
"]"
;
public
static
final
String
USER_CODE
=
"User"
;
public
static
final
String
RESULT
=
"result"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/NodeComponentTable.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.agentstream.worker.node
;
/**
* @author pengys5
*/
public
class
NodeComponentTable
{
public
static
final
String
TABLE
=
"node_component"
;
public
static
final
String
COLUMN_NAME
=
"name"
;
public
static
final
String
COLUMN_PEERS
=
"peers"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentEsTableDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
import
org.skywalking.apm.collector.agentstream.worker.node.NodeComponentTable
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
/**
* @author pengys5
*/
public
class
NodeComponentEsTableDefine
extends
ElasticSearchTableDefine
{
public
NodeComponentEsTableDefine
()
{
super
(
NodeComponentTable
.
TABLE
);
}
@Override
public
int
refreshInterval
()
{
return
0
;
}
@Override
public
int
numberOfShards
()
{
return
2
;
}
@Override
public
int
numberOfReplicas
()
{
return
0
;
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
NodeComponentTable
.
COLUMN_NAME
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
NodeComponentTable
.
COLUMN_PEERS
,
ElasticSearchColumnDefine
.
Type
.
Keyword
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/node/define/NodeComponentH2TableDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.agentstream.worker.node.define
;
import
org.skywalking.apm.collector.agentstream.worker.node.NodeComponentTable
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
/**
* @author pengys5
*/
public
class
NodeComponentH2TableDefine
extends
H2TableDefine
{
public
NodeComponentH2TableDefine
()
{
super
(
NodeComponentTable
.
TABLE
);
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_NAME
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
NodeComponentTable
.
COLUMN_PEERS
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
0 → 100644
浏览文件 @
2042709b
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentH2TableDefine
\ No newline at end of file
apm-collector/apm-collector-boot/src/main/resources/application.yml
0 → 100644
浏览文件 @
2042709b
cluster
:
zookeeper
:
hostPort
:
localhost:2181
sessionTimeout
:
100000
# redis:
# host: localhost
# port: 6379
queue
:
disruptor
:
on
data_carrier
:
off
agentstream
:
grpc
:
host
:
localhost
port
:
1000
jetty
:
host
:
localhost
port
:
2000
context_path
:
/
discovery
:
grpc
:
localhost
port
:
1000
ui
:
jetty
:
host
:
localhost
port
:
12800
storage
:
elasticsearch
:
cluster_name
:
CollectorDBCluster
cluster_transport_sniffer
:
true
cluster_nodes
:
127.0.0.1:9300
apm-collector/apm-collector-client/pom.xml
浏览文件 @
2042709b
...
...
@@ -31,7 +31,7 @@
<dependency>
<groupId>
org.elasticsearch.client
</groupId>
<artifactId>
transport
</artifactId>
<version>
5.
2.2
</version>
<version>
5.
5.0
</version>
<exclusions>
<exclusion>
<artifactId>
snakeyaml
</artifactId>
...
...
apm-collector/apm-collector-client/src/main/java/org/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
浏览文件 @
2042709b
...
...
@@ -4,8 +4,13 @@ import java.net.InetAddress;
import
java.net.UnknownHostException
;
import
java.util.LinkedList
;
import
java.util.List
;
import
org.elasticsearch.action.admin.indices.create.CreateIndexResponse
;
import
org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse
;
import
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
;
import
org.elasticsearch.client.IndicesAdminClient
;
import
org.elasticsearch.common.settings.Settings
;
import
org.elasticsearch.common.transport.InetSocketTransportAddress
;
import
org.elasticsearch.common.xcontent.XContentBuilder
;
import
org.elasticsearch.transport.client.PreBuiltTransportClient
;
import
org.skywalking.apm.collector.core.client.Client
;
import
org.skywalking.apm.collector.core.client.ClientException
;
...
...
@@ -23,11 +28,11 @@ public class ElasticSearchClient implements Client {
private
final
String
clusterName
;
private
final
String
clusterTransportSniffer
;
private
final
Boolean
clusterTransportSniffer
;
private
final
String
clusterNodes
;
public
ElasticSearchClient
(
String
clusterName
,
String
clusterTransportSniffer
,
String
clusterNodes
)
{
public
ElasticSearchClient
(
String
clusterName
,
Boolean
clusterTransportSniffer
,
String
clusterNodes
)
{
this
.
clusterName
=
clusterName
;
this
.
clusterTransportSniffer
=
clusterTransportSniffer
;
this
.
clusterNodes
=
clusterNodes
;
...
...
@@ -74,4 +79,24 @@ public class ElasticSearchClient implements Client {
this
.
port
=
port
;
}
}
public
boolean
createIndex
(
String
indexName
,
String
indexType
,
Settings
settings
,
XContentBuilder
mappingBuilder
)
{
IndicesAdminClient
adminClient
=
client
.
admin
().
indices
();
CreateIndexResponse
response
=
adminClient
.
prepareCreate
(
indexName
).
setSettings
(
settings
).
addMapping
(
indexType
,
mappingBuilder
).
get
();
logger
.
info
(
"create {} index with type of {} finished, isAcknowledged: {}"
,
indexName
,
indexType
,
response
.
isAcknowledged
());
return
response
.
isShardsAcked
();
}
public
boolean
deleteIndex
(
String
indexName
)
{
IndicesAdminClient
adminClient
=
client
.
admin
().
indices
();
DeleteIndexResponse
response
=
adminClient
.
prepareDelete
(
indexName
).
get
();
logger
.
info
(
"delete {} index finished, isAcknowledged: {}"
,
indexName
,
response
.
isAcknowledged
());
return
response
.
isAcknowledged
();
}
public
boolean
isExistsIndex
(
String
indexName
)
{
IndicesAdminClient
adminClient
=
client
.
admin
().
indices
();
IndicesExistsResponse
response
=
adminClient
.
prepareExists
(
indexName
).
get
();
return
response
.
isExists
();
}
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/framework/Writer.java
已删除
100644 → 0
浏览文件 @
706874c9
package
org.skywalking.apm.collector.core.framework
;
/**
* @author pengys5
*/
public
interface
Writer
{
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/ColumnDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.core.storage
;
/**
* @author pengys5
*/
public
abstract
class
ColumnDefine
{
private
final
String
name
;
private
final
String
type
;
public
ColumnDefine
(
String
name
,
String
type
)
{
this
.
name
=
name
;
this
.
type
=
type
;
}
public
final
String
getName
()
{
return
name
;
}
public
String
getType
()
{
return
type
;
}
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/Storage.java
已删除
100644 → 0
浏览文件 @
706874c9
package
org.skywalking.apm.collector.core.storage
;
/**
* @author pengys5
*/
public
interface
Storage
{
void
initialize
()
throws
StorageException
;
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefineLoader.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.core.storage
;
import
java.util.LinkedList
;
import
java.util.List
;
import
org.skywalking.apm.collector.core.config.ConfigException
;
import
org.skywalking.apm.collector.core.framework.Loader
;
import
org.skywalking.apm.collector.core.util.DefinitionLoader
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
StorageDefineLoader
implements
Loader
<
List
<
TableDefine
>>
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
StorageDefineLoader
.
class
);
@Override
public
List
<
TableDefine
>
load
()
throws
ConfigException
{
List
<
TableDefine
>
tableDefines
=
new
LinkedList
<>();
StorageDefinitionFile
definitionFile
=
new
StorageDefinitionFile
();
logger
.
info
(
"storage definition file name: {}"
,
definitionFile
.
fileName
());
DefinitionLoader
<
TableDefine
>
definitionLoader
=
DefinitionLoader
.
load
(
TableDefine
.
class
,
definitionFile
);
for
(
TableDefine
tableDefine
:
definitionLoader
)
{
logger
.
info
(
"loaded storage definition class: {}"
,
tableDefine
.
getClass
().
getName
());
tableDefines
.
add
(
tableDefine
);
}
return
tableDefines
;
}
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageDefinitionFile.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.core.storage
;
import
org.skywalking.apm.collector.core.framework.DefinitionFile
;
/**
* @author pengys5
*/
public
class
StorageDefinitionFile
extends
DefinitionFile
{
@Override
protected
String
fileName
()
{
return
"storage.define"
;
}
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstallException.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.core.storage
;
/**
* @author pengys5
*/
public
class
StorageInstallException
extends
StorageException
{
public
StorageInstallException
(
String
message
)
{
super
(
message
);
}
public
StorageInstallException
(
String
message
,
Throwable
cause
)
{
super
(
message
,
cause
);
}
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/StorageInstaller.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.core.storage
;
import
java.util.List
;
import
org.skywalking.apm.collector.core.client.Client
;
import
org.skywalking.apm.collector.core.config.ConfigException
;
/**
* @author pengys5
*/
public
abstract
class
StorageInstaller
{
public
final
void
install
(
Client
client
)
throws
StorageException
{
StorageDefineLoader
defineLoader
=
new
StorageDefineLoader
();
try
{
List
<
TableDefine
>
tableDefines
=
defineLoader
.
load
();
defineFilter
(
tableDefines
);
for
(
TableDefine
tableDefine
:
tableDefines
)
{
if
(
isExists
(
client
,
tableDefine
))
{
deleteIndex
(
client
,
tableDefine
);
}
else
{
createTable
(
client
,
tableDefine
);
}
}
}
catch
(
ConfigException
e
)
{
throw
new
StorageInstallException
(
e
.
getMessage
(),
e
);
}
}
protected
abstract
void
defineFilter
(
List
<
TableDefine
>
tableDefines
);
protected
abstract
boolean
isExists
(
Client
client
,
TableDefine
tableDefine
)
throws
StorageException
;
protected
abstract
boolean
deleteIndex
(
Client
client
,
TableDefine
tableDefine
)
throws
StorageException
;
protected
abstract
boolean
createTable
(
Client
client
,
TableDefine
tableDefine
)
throws
StorageException
;
}
apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/storage/TableDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.core.storage
;
import
java.util.LinkedList
;
import
java.util.List
;
/**
* @author pengys5
*/
public
abstract
class
TableDefine
{
private
final
String
name
;
private
final
List
<
ColumnDefine
>
columnDefines
;
public
TableDefine
(
String
name
)
{
this
.
name
=
name
;
this
.
columnDefines
=
new
LinkedList
<>();
}
public
abstract
void
initialize
();
public
final
void
addColumn
(
ColumnDefine
columnDefine
)
{
columnDefines
.
add
(
columnDefine
);
}
public
String
getName
()
{
return
name
;
}
public
List
<
ColumnDefine
>
getColumnDefines
()
{
return
columnDefines
;
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleDefine.java
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage
;
import
java.util.Map
;
import
org.skywalking.apm.collector.c
luster.ClusterModuleGroupDefine
;
import
org.skywalking.apm.collector.c
ore.client.Client
;
import
org.skywalking.apm.collector.core.client.ClientException
;
import
org.skywalking.apm.collector.core.cluster.ClusterDataListener
;
import
org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine
;
...
...
@@ -12,6 +12,8 @@ import org.skywalking.apm.collector.core.module.ModuleDefine;
import
org.skywalking.apm.collector.core.module.ModuleRegistration
;
import
org.skywalking.apm.collector.core.server.Server
;
import
org.skywalking.apm.collector.core.server.ServerHolder
;
import
org.skywalking.apm.collector.core.storage.StorageException
;
import
org.skywalking.apm.collector.core.storage.StorageInstaller
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -26,10 +28,14 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
public
final
void
initialize
(
Map
config
,
ServerHolder
serverHolder
)
throws
DefineException
,
ClientException
{
try
{
configParser
().
parse
(
config
);
StorageModuleContext
context
=
new
StorageModuleContext
(
ClusterModuleGroupDefine
.
GROUP_NAME
);
context
.
setClient
(
createClient
(
null
));
CollectorContextHelper
.
INSTANCE
.
putContext
(
context
);
}
catch
(
ConfigParseException
e
)
{
StorageModuleContext
context
=
(
StorageModuleContext
)
CollectorContextHelper
.
INSTANCE
.
getContext
(
StorageModuleGroupDefine
.
GROUP_NAME
);
Client
client
=
createClient
(
null
);
client
.
initialize
();
context
.
setClient
(
client
);
storageInstaller
().
install
(
client
);
}
catch
(
ConfigParseException
|
StorageException
e
)
{
throw
new
StorageModuleException
(
e
.
getMessage
(),
e
);
}
}
...
...
@@ -49,4 +55,6 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
@Override
public
final
boolean
defaultModule
()
{
return
true
;
}
public
abstract
StorageInstaller
storageInstaller
();
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/StorageModuleInstaller.java
浏览文件 @
2042709b
...
...
@@ -19,7 +19,7 @@ public class StorageModuleInstaller extends SingleModuleInstaller {
@Override
public
void
install
(
Map
<
String
,
Map
>
moduleConfig
,
Map
<
String
,
ModuleDefine
>
moduleDefineMap
,
ServerHolder
serverHolder
)
throws
DefineException
,
ClientException
{
logger
.
info
(
"beginning
agent stream
module install"
);
logger
.
info
(
"beginning
storage
module install"
);
StorageModuleContext
context
=
new
StorageModuleContext
(
StorageModuleGroupDefine
.
GROUP_NAME
);
CollectorContextHelper
.
INSTANCE
.
putContext
(
context
);
...
...
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/ElasticSearchStorageException.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.elasticsearch
;
import
org.skywalking.apm.collector.core.storage.StorageException
;
/**
* @author pengys5
*/
public
class
ElasticSearchStorageException
extends
StorageException
{
public
ElasticSearchStorageException
(
String
message
)
{
super
(
message
);
}
public
ElasticSearchStorageException
(
String
message
,
Throwable
cause
)
{
super
(
message
,
cause
);
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfig.java
浏览文件 @
2042709b
...
...
@@ -5,6 +5,6 @@ package org.skywalking.apm.collector.storage.elasticsearch;
*/
public
class
StorageElasticSearchConfig
{
public
static
String
CLUSTER_NAME
;
public
static
String
CLUSTER_TRANSPORT_SNIFFER
;
public
static
Boolean
CLUSTER_TRANSPORT_SNIFFER
;
public
static
String
CLUSTER_NODES
;
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchConfigParser.java
浏览文件 @
2042709b
...
...
@@ -20,7 +20,7 @@ public class StorageElasticSearchConfigParser implements ModuleConfigParser {
StorageElasticSearchConfig
.
CLUSTER_NAME
=
(
String
)
config
.
get
(
CLUSTER_NAME
);
}
if
(
ObjectUtils
.
isNotEmpty
(
config
)
&&
StringUtils
.
isNotEmpty
(
config
.
get
(
CLUSTER_TRANSPORT_SNIFFER
)))
{
StorageElasticSearchConfig
.
CLUSTER_TRANSPORT_SNIFFER
=
(
String
)
config
.
get
(
CLUSTER_TRANSPORT_SNIFFER
);
StorageElasticSearchConfig
.
CLUSTER_TRANSPORT_SNIFFER
=
(
Boolean
)
config
.
get
(
CLUSTER_TRANSPORT_SNIFFER
);
}
if
(
ObjectUtils
.
isNotEmpty
(
config
)
&&
StringUtils
.
isNotEmpty
(
config
.
get
(
CLUSTER_NODES
)))
{
StorageElasticSearchConfig
.
CLUSTER_NODES
=
(
String
)
config
.
get
(
CLUSTER_NODES
);
...
...
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/StorageElasticSearchModuleDefine.java
浏览文件 @
2042709b
...
...
@@ -4,8 +4,10 @@ import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import
org.skywalking.apm.collector.core.client.Client
;
import
org.skywalking.apm.collector.core.client.DataMonitor
;
import
org.skywalking.apm.collector.core.module.ModuleConfigParser
;
import
org.skywalking.apm.collector.core.storage.StorageInstaller
;
import
org.skywalking.apm.collector.storage.StorageModuleDefine
;
import
org.skywalking.apm.collector.storage.StorageModuleGroupDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchStorageInstaller
;
/**
* @author pengys5
...
...
@@ -29,4 +31,8 @@ public class StorageElasticSearchModuleDefine extends StorageModuleDefine {
@Override
protected
Client
createClient
(
DataMonitor
dataMonitor
)
{
return
new
ElasticSearchClient
(
StorageElasticSearchConfig
.
CLUSTER_NAME
,
StorageElasticSearchConfig
.
CLUSTER_TRANSPORT_SNIFFER
,
StorageElasticSearchConfig
.
CLUSTER_NODES
);
}
@Override
public
StorageInstaller
storageInstaller
()
{
return
new
ElasticSearchStorageInstaller
();
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchColumnDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.elasticsearch.define
;
import
org.skywalking.apm.collector.core.storage.ColumnDefine
;
/**
* @author pengys5
*/
public
class
ElasticSearchColumnDefine
extends
ColumnDefine
{
public
ElasticSearchColumnDefine
(
String
name
,
String
type
)
{
super
(
name
,
type
);
}
public
enum
Type
{
Binary
,
Boolean
,
Date
,
Keyword
,
Long
,
Text
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchStorageInstaller.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.elasticsearch.define
;
import
java.io.IOException
;
import
java.util.List
;
import
org.elasticsearch.common.settings.Settings
;
import
org.elasticsearch.common.xcontent.XContentBuilder
;
import
org.elasticsearch.common.xcontent.XContentFactory
;
import
org.elasticsearch.index.IndexNotFoundException
;
import
org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient
;
import
org.skywalking.apm.collector.core.client.Client
;
import
org.skywalking.apm.collector.core.storage.ColumnDefine
;
import
org.skywalking.apm.collector.core.storage.StorageInstaller
;
import
org.skywalking.apm.collector.core.storage.TableDefine
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
ElasticSearchStorageInstaller
extends
StorageInstaller
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
ElasticSearchStorageInstaller
.
class
);
@Override
protected
void
defineFilter
(
List
<
TableDefine
>
tableDefines
)
{
int
size
=
tableDefines
.
size
();
for
(
int
i
=
size
-
1
;
i
>=
0
;
i
--)
{
if
(!(
tableDefines
.
get
(
i
)
instanceof
ElasticSearchTableDefine
))
{
tableDefines
.
remove
(
i
);
}
}
}
@Override
protected
boolean
createTable
(
Client
client
,
TableDefine
tableDefine
)
{
ElasticSearchClient
esClient
=
(
ElasticSearchClient
)
client
;
ElasticSearchTableDefine
esTableDefine
=
(
ElasticSearchTableDefine
)
tableDefine
;
// settings
String
settingSource
=
""
;
// mapping
XContentBuilder
mappingBuilder
=
null
;
try
{
XContentBuilder
settingsBuilder
=
createSettingBuilder
(
esTableDefine
);
settingSource
=
settingsBuilder
.
string
();
mappingBuilder
=
createMappingBuilder
(
esTableDefine
);
logger
.
info
(
"mapping builder str: {}"
,
mappingBuilder
.
string
());
}
catch
(
Exception
e
)
{
logger
.
error
(
"create {} index mapping builder error"
,
esTableDefine
.
getName
());
}
Settings
settings
=
Settings
.
builder
().
loadFromSource
(
settingSource
).
build
();
boolean
isAcknowledged
=
esClient
.
createIndex
(
esTableDefine
.
getName
(),
esTableDefine
.
type
(),
settings
,
mappingBuilder
);
logger
.
info
(
"create {} index with type of {} finished, isAcknowledged: {}"
,
esTableDefine
.
getName
(),
esTableDefine
.
type
(),
isAcknowledged
);
return
isAcknowledged
;
}
private
XContentBuilder
createSettingBuilder
(
ElasticSearchTableDefine
tableDefine
)
throws
IOException
{
return
XContentFactory
.
jsonBuilder
()
.
startObject
()
.
field
(
"index.number_of_shards"
,
tableDefine
.
numberOfShards
())
.
field
(
"index.number_of_replicas"
,
tableDefine
.
numberOfReplicas
())
.
field
(
"index.refresh_interval"
,
String
.
valueOf
(
tableDefine
.
refreshInterval
())
+
"s"
)
.
endObject
();
}
private
XContentBuilder
createMappingBuilder
(
ElasticSearchTableDefine
tableDefine
)
throws
IOException
{
XContentBuilder
mappingBuilder
=
XContentFactory
.
jsonBuilder
()
.
startObject
()
.
startObject
(
"properties"
);
for
(
ColumnDefine
columnDefine
:
tableDefine
.
getColumnDefines
())
{
ElasticSearchColumnDefine
elasticSearchColumnDefine
=
(
ElasticSearchColumnDefine
)
columnDefine
;
mappingBuilder
.
startObject
(
elasticSearchColumnDefine
.
getName
())
.
field
(
"type"
,
elasticSearchColumnDefine
.
getType
())
.
endObject
();
}
mappingBuilder
.
endObject
()
.
endObject
();
return
mappingBuilder
;
}
@Override
protected
boolean
deleteIndex
(
Client
client
,
TableDefine
tableDefine
)
{
ElasticSearchClient
esClient
=
(
ElasticSearchClient
)
client
;
try
{
return
esClient
.
deleteIndex
(
tableDefine
.
getName
());
}
catch
(
IndexNotFoundException
e
)
{
logger
.
info
(
"{} index not found"
,
tableDefine
.
getName
());
}
return
false
;
}
@Override
protected
boolean
isExists
(
Client
client
,
TableDefine
tableDefine
)
{
ElasticSearchClient
esClient
=
(
ElasticSearchClient
)
client
;
return
esClient
.
isExistsIndex
(
tableDefine
.
getName
());
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchTableDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.elasticsearch.define
;
import
org.skywalking.apm.collector.core.storage.TableDefine
;
/**
* @author pengys5
*/
public
abstract
class
ElasticSearchTableDefine
extends
TableDefine
{
public
ElasticSearchTableDefine
(
String
name
)
{
super
(
name
);
}
public
final
String
type
()
{
return
"type"
;
}
public
abstract
int
refreshInterval
();
public
abstract
int
numberOfShards
();
public
abstract
int
numberOfReplicas
();
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/StorageH2ModuleDefine.java
浏览文件 @
2042709b
...
...
@@ -4,8 +4,10 @@ import org.skywalking.apm.collector.client.h2.H2Client;
import
org.skywalking.apm.collector.core.client.Client
;
import
org.skywalking.apm.collector.core.client.DataMonitor
;
import
org.skywalking.apm.collector.core.module.ModuleConfigParser
;
import
org.skywalking.apm.collector.core.storage.StorageInstaller
;
import
org.skywalking.apm.collector.storage.StorageModuleDefine
;
import
org.skywalking.apm.collector.storage.StorageModuleGroupDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2StorageInstaller
;
/**
* @author pengys5
...
...
@@ -29,4 +31,8 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
@Override
protected
Client
createClient
(
DataMonitor
dataMonitor
)
{
return
new
H2Client
();
}
@Override
public
StorageInstaller
storageInstaller
()
{
return
new
H2StorageInstaller
();
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2ColumnDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.h2.define
;
import
org.skywalking.apm.collector.core.storage.ColumnDefine
;
/**
* @author pengys5
*/
public
class
H2ColumnDefine
extends
ColumnDefine
{
public
H2ColumnDefine
(
String
name
,
String
type
)
{
super
(
name
,
type
);
}
public
enum
Type
{
Boolean
,
Varchar
,
Bigint
,
Date
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2StorageInstaller.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.h2.define
;
import
java.util.List
;
import
org.skywalking.apm.collector.client.h2.H2Client
;
import
org.skywalking.apm.collector.client.h2.H2ClientException
;
import
org.skywalking.apm.collector.core.client.Client
;
import
org.skywalking.apm.collector.core.storage.StorageException
;
import
org.skywalking.apm.collector.core.storage.StorageInstallException
;
import
org.skywalking.apm.collector.core.storage.StorageInstaller
;
import
org.skywalking.apm.collector.core.storage.TableDefine
;
/**
* @author pengys5
*/
public
class
H2StorageInstaller
extends
StorageInstaller
{
@Override
protected
void
defineFilter
(
List
<
TableDefine
>
tableDefines
)
{
int
size
=
tableDefines
.
size
();
for
(
int
i
=
size
-
1
;
i
>=
0
;
i
--)
{
if
(!(
tableDefines
.
get
(
i
)
instanceof
H2TableDefine
))
{
tableDefines
.
remove
(
i
);
}
}
}
@Override
protected
boolean
isExists
(
Client
client
,
TableDefine
tableDefine
)
throws
StorageException
{
return
false
;
}
@Override
protected
boolean
deleteIndex
(
Client
client
,
TableDefine
tableDefine
)
throws
StorageException
{
return
false
;
}
@Override
protected
boolean
createTable
(
Client
client
,
TableDefine
tableDefine
)
throws
StorageException
{
H2Client
h2Client
=
(
H2Client
)
client
;
H2TableDefine
h2TableDefine
=
(
H2TableDefine
)
tableDefine
;
StringBuilder
sqlBuilder
=
new
StringBuilder
();
sqlBuilder
.
append
(
"CREATE TABLE "
).
append
(
h2TableDefine
.
getName
()).
append
(
" ("
);
h2TableDefine
.
getColumnDefines
().
forEach
(
columnDefine
->
{
H2ColumnDefine
h2ColumnDefine
=
(
H2ColumnDefine
)
columnDefine
;
if
(
h2ColumnDefine
.
getType
().
equals
(
H2ColumnDefine
.
Type
.
Varchar
.
name
()))
{
sqlBuilder
.
append
(
h2ColumnDefine
.
getName
()).
append
(
" "
).
append
(
h2ColumnDefine
.
getType
()).
append
(
"(255)"
);
}
else
{
sqlBuilder
.
append
(
h2ColumnDefine
.
getName
()).
append
(
" "
).
append
(
h2ColumnDefine
.
getType
());
}
});
sqlBuilder
.
append
(
")"
);
try
{
h2Client
.
execute
(
sqlBuilder
.
toString
());
}
catch
(
H2ClientException
e
)
{
throw
new
StorageInstallException
(
e
.
getMessage
(),
e
);
}
return
true
;
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2TableDefine.java
0 → 100644
浏览文件 @
2042709b
package
org.skywalking.apm.collector.storage.h2.define
;
import
org.skywalking.apm.collector.core.storage.TableDefine
;
/**
* @author pengys5
*/
public
abstract
class
H2TableDefine
extends
TableDefine
{
public
H2TableDefine
(
String
name
)
{
super
(
name
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录