Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
阿信在这里
SkyWalking
提交
a798bb64
S
SkyWalking
项目概览
阿信在这里
/
SkyWalking
与 Fork 源项目一致
Fork自
山不在高_有仙则灵 / SkyWalking
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a798bb64
编写于
8月 09, 2017
作者:
P
pengys5
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add cpu metric record persistence, but not test.
上级
00c708fa
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
440 addition
and
32 deletion
+440
-32
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java
...llector/agentstream/grpc/AgentStreamGRPCModuleDefine.java
+2
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/JVMMetricsServiceHandler.java
...or/agentstream/grpc/handler/JVMMetricsServiceHandler.java
+56
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/CpuMetricPersistenceWorker.java
...ream/worker/jvmmetric/cpu/CpuMetricPersistenceWorker.java
+71
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/dao/CpuMetricEsDAO.java
.../agentstream/worker/jvmmetric/cpu/dao/CpuMetricEsDAO.java
+34
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/dao/CpuMetricH2DAO.java
.../agentstream/worker/jvmmetric/cpu/dao/CpuMetricH2DAO.java
+9
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/dao/ICpuMetricDAO.java
...r/agentstream/worker/jvmmetric/cpu/dao/ICpuMetricDAO.java
+7
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricDataDefine.java
...ream/worker/jvmmetric/cpu/define/CpuMetricDataDefine.java
+102
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricEsTableDefine.java
...m/worker/jvmmetric/cpu/define/CpuMetricEsTableDefine.java
+32
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricH2TableDefine.java
...m/worker/jvmmetric/cpu/define/CpuMetricH2TableDefine.java
+21
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricTable.java
...entstream/worker/jvmmetric/cpu/define/CpuMetricTable.java
+12
-0
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/util/TimeBucketUtils.java
...pm/collector/agentstream/worker/util/TimeBucketUtils.java
+8
-0
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/es_dao.define
...tstream/src/main/resources/META-INF/defines/es_dao.define
+2
-1
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/h2_dao.define
...tstream/src/main/resources/META-INF/defines/h2_dao.define
+2
-1
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_worker_provider.define
...n/resources/META-INF/defines/local_worker_provider.define
+2
-0
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
...stream/src/main/resources/META-INF/defines/storage.define
+4
-1
apm-collector/apm-collector-agentstream/src/test/java/org/skywalking/apm/collector/agentstream/util/TimeBucketUtilsTestCase.java
...m/collector/agentstream/util/TimeBucketUtilsTestCase.java
+47
-0
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchColumnDefine.java
...orage/elasticsearch/define/ElasticSearchColumnDefine.java
+1
-1
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2ColumnDefine.java
...lking/apm/collector/storage/h2/define/H2ColumnDefine.java
+1
-1
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java
.../apm/collector/stream/worker/impl/data/AttributeType.java
+1
-1
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Data.java
...kywalking/apm/collector/stream/worker/impl/data/Data.java
+12
-12
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefine.java
...ing/apm/collector/stream/worker/impl/data/DataDefine.java
+8
-8
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Operation.java
...king/apm/collector/stream/worker/impl/data/Operation.java
+1
-1
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/AddOperation.java
...llector/stream/worker/impl/data/operate/AddOperation.java
+1
-1
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/CoverOperation.java
...ector/stream/worker/impl/data/operate/CoverOperation.java
+1
-1
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/NonOperation.java
...llector/stream/worker/impl/data/operate/NonOperation.java
+1
-1
apm-collector/apm-collector-stream/src/main/proto/RemoteCommonService.proto
...collector-stream/src/main/proto/RemoteCommonService.proto
+2
-2
未找到文件。
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/AgentStreamGRPCModuleDefine.java
浏览文件 @
a798bb64
...
...
@@ -4,6 +4,7 @@ import java.util.LinkedList;
import
java.util.List
;
import
org.skywalking.apm.collector.agentstream.AgentStreamModuleDefine
;
import
org.skywalking.apm.collector.agentstream.AgentStreamModuleGroupDefine
;
import
org.skywalking.apm.collector.agentstream.grpc.handler.JVMMetricsServiceHandler
;
import
org.skywalking.apm.collector.agentstream.grpc.handler.TraceSegmentServiceHandler
;
import
org.skywalking.apm.collector.core.cluster.ClusterDataListener
;
import
org.skywalking.apm.collector.core.framework.Handler
;
...
...
@@ -46,6 +47,7 @@ public class AgentStreamGRPCModuleDefine extends AgentStreamModuleDefine {
@Override
public
List
<
Handler
>
handlerList
()
{
List
<
Handler
>
handlers
=
new
LinkedList
<>();
handlers
.
add
(
new
TraceSegmentServiceHandler
());
handlers
.
add
(
new
JVMMetricsServiceHandler
());
return
handlers
;
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/grpc/handler/JVMMetricsServiceHandler.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.grpc.handler
;
import
io.grpc.stub.StreamObserver
;
import
org.skywalking.apm.collector.agentstream.worker.Const
;
import
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker
;
import
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricDataDefine
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
import
org.skywalking.apm.collector.core.framework.CollectorContextHelper
;
import
org.skywalking.apm.collector.server.grpc.GRPCHandler
;
import
org.skywalking.apm.collector.stream.StreamModuleContext
;
import
org.skywalking.apm.collector.stream.StreamModuleGroupDefine
;
import
org.skywalking.apm.collector.stream.worker.WorkerInvokeException
;
import
org.skywalking.apm.collector.stream.worker.WorkerNotFoundException
;
import
org.skywalking.apm.network.proto.CPU
;
import
org.skywalking.apm.network.proto.Downstream
;
import
org.skywalking.apm.network.proto.JVMMetrics
;
import
org.skywalking.apm.network.proto.JVMMetricsServiceGrpc
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
/**
* @author pengys5
*/
public
class
JVMMetricsServiceHandler
extends
JVMMetricsServiceGrpc
.
JVMMetricsServiceImplBase
implements
GRPCHandler
{
private
final
Logger
logger
=
LoggerFactory
.
getLogger
(
JVMMetricsServiceHandler
.
class
);
@Override
public
void
collect
(
JVMMetrics
request
,
StreamObserver
<
Downstream
>
responseObserver
)
{
int
applicationInstanceId
=
request
.
getApplicationInstanceId
();
logger
.
debug
(
"receive the jvm metric from application instance, id: {}"
,
applicationInstanceId
);
StreamModuleContext
context
=
(
StreamModuleContext
)
CollectorContextHelper
.
INSTANCE
.
getContext
(
StreamModuleGroupDefine
.
GROUP_NAME
);
request
.
getMetricsList
().
forEach
(
metric
->
{
long
time
=
TimeBucketUtils
.
INSTANCE
.
getSecondTimeBucket
(
metric
.
getTime
());
sendToCpuMetricPersistenceWorker
(
context
,
applicationInstanceId
,
time
,
metric
.
getCpu
());
});
responseObserver
.
onNext
(
Downstream
.
newBuilder
().
build
());
responseObserver
.
onCompleted
();
}
private
void
sendToCpuMetricPersistenceWorker
(
StreamModuleContext
context
,
int
applicationInstanceId
,
long
timeBucket
,
CPU
cpu
)
{
CpuMetricDataDefine
.
CpuMetric
cpuMetric
=
new
CpuMetricDataDefine
.
CpuMetric
();
cpuMetric
.
setId
(
timeBucket
+
Const
.
ID_SPLIT
+
applicationInstanceId
);
cpuMetric
.
setApplicationInstanceId
(
applicationInstanceId
);
cpuMetric
.
setUsagePercent
(
cpu
.
getUsagePercent
());
cpuMetric
.
setTimeBucket
(
timeBucket
);
try
{
logger
.
debug
(
"send to cpu metric persistence worker, id: {}"
,
cpuMetric
.
getId
());
context
.
getClusterWorkerContext
().
lookup
(
CpuMetricPersistenceWorker
.
WorkerRole
.
INSTANCE
).
tell
(
cpuMetric
.
toData
());
}
catch
(
WorkerInvokeException
|
WorkerNotFoundException
e
)
{
logger
.
error
(
e
.
getMessage
(),
e
);
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/CpuMetricPersistenceWorker.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu
;
import
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.ICpuMetricDAO
;
import
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricDataDefine
;
import
org.skywalking.apm.collector.storage.dao.DAOContainer
;
import
org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider
;
import
org.skywalking.apm.collector.stream.worker.ClusterWorkerContext
;
import
org.skywalking.apm.collector.stream.worker.ProviderNotFoundException
;
import
org.skywalking.apm.collector.stream.worker.Role
;
import
org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker
;
import
org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector
;
import
org.skywalking.apm.collector.stream.worker.selector.WorkerSelector
;
/**
* @author pengys5
*/
public
class
CpuMetricPersistenceWorker
extends
PersistenceWorker
{
public
CpuMetricPersistenceWorker
(
Role
role
,
ClusterWorkerContext
clusterContext
)
{
super
(
role
,
clusterContext
);
}
@Override
public
void
preStart
()
throws
ProviderNotFoundException
{
super
.
preStart
();
}
@Override
protected
boolean
needMergeDBData
()
{
return
false
;
}
@Override
protected
IPersistenceDAO
persistenceDAO
()
{
return
(
IPersistenceDAO
)
DAOContainer
.
INSTANCE
.
get
(
ICpuMetricDAO
.
class
.
getName
());
}
public
static
class
Factory
extends
AbstractLocalAsyncWorkerProvider
<
CpuMetricPersistenceWorker
>
{
@Override
public
Role
role
()
{
return
WorkerRole
.
INSTANCE
;
}
@Override
public
CpuMetricPersistenceWorker
workerInstance
(
ClusterWorkerContext
clusterContext
)
{
return
new
CpuMetricPersistenceWorker
(
role
(),
clusterContext
);
}
@Override
public
int
queueSize
()
{
return
1024
;
}
}
public
enum
WorkerRole
implements
Role
{
INSTANCE
;
@Override
public
String
roleName
()
{
return
CpuMetricPersistenceWorker
.
class
.
getSimpleName
();
}
@Override
public
WorkerSelector
workerSelector
()
{
return
new
HashCodeSelector
();
}
@Override
public
DataDefine
dataDefine
()
{
return
new
CpuMetricDataDefine
();
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/dao/CpuMetricEsDAO.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao
;
import
java.util.HashMap
;
import
java.util.Map
;
import
org.elasticsearch.action.index.IndexRequestBuilder
;
import
org.elasticsearch.action.update.UpdateRequestBuilder
;
import
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricTable
;
import
org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO
;
import
org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Data
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
/**
* @author pengys5
*/
public
class
CpuMetricEsDAO
extends
EsDAO
implements
ICpuMetricDAO
,
IPersistenceDAO
<
IndexRequestBuilder
,
UpdateRequestBuilder
>
{
@Override
public
Data
get
(
String
id
,
DataDefine
dataDefine
)
{
return
null
;
}
@Override
public
IndexRequestBuilder
prepareBatchInsert
(
Data
data
)
{
Map
<
String
,
Object
>
source
=
new
HashMap
<>();
source
.
put
(
CpuMetricTable
.
COLUMN_APPLICATION_INSTANCE_ID
,
data
.
getDataInteger
(
0
));
source
.
put
(
CpuMetricTable
.
COLUMN_USAGE_PERCENT
,
data
.
getDataDouble
(
0
));
source
.
put
(
CpuMetricTable
.
COLUMN_TIME_BUCKET
,
data
.
getDataLong
(
0
));
return
getClient
().
prepareIndex
(
CpuMetricTable
.
TABLE
,
data
.
getDataString
(
0
)).
setSource
(
source
);
}
@Override
public
UpdateRequestBuilder
prepareBatchUpdate
(
Data
data
)
{
return
null
;
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/dao/CpuMetricH2DAO.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao
;
import
org.skywalking.apm.collector.storage.h2.dao.H2DAO
;
/**
* @author pengys5
*/
public
class
CpuMetricH2DAO
extends
H2DAO
implements
ICpuMetricDAO
{
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/dao/ICpuMetricDAO.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao
;
/**
* @author pengys5
*/
public
interface
ICpuMetricDAO
{
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricDataDefine.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define
;
import
org.skywalking.apm.collector.remote.grpc.proto.RemoteData
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Attribute
;
import
org.skywalking.apm.collector.stream.worker.impl.data.AttributeType
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Data
;
import
org.skywalking.apm.collector.stream.worker.impl.data.DataDefine
;
import
org.skywalking.apm.collector.stream.worker.impl.data.Transform
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation
;
import
org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
;
/**
* @author pengys5
*/
public
class
CpuMetricDataDefine
extends
DataDefine
{
@Override
protected
int
initialCapacity
()
{
return
4
;
}
@Override
protected
void
attributeDefine
()
{
addAttribute
(
0
,
new
Attribute
(
CpuMetricTable
.
COLUMN_ID
,
AttributeType
.
STRING
,
new
NonOperation
()));
addAttribute
(
1
,
new
Attribute
(
CpuMetricTable
.
COLUMN_APPLICATION_INSTANCE_ID
,
AttributeType
.
INTEGER
,
new
CoverOperation
()));
addAttribute
(
2
,
new
Attribute
(
CpuMetricTable
.
COLUMN_USAGE_PERCENT
,
AttributeType
.
DOUBLE
,
new
CoverOperation
()));
addAttribute
(
2
,
new
Attribute
(
CpuMetricTable
.
COLUMN_TIME_BUCKET
,
AttributeType
.
LONG
,
new
CoverOperation
()));
}
@Override
public
Object
deserialize
(
RemoteData
remoteData
)
{
return
null
;
}
@Override
public
RemoteData
serialize
(
Object
object
)
{
return
null
;
}
public
static
class
CpuMetric
implements
Transform
<
CpuMetric
>
{
private
String
id
;
private
int
applicationInstanceId
;
private
double
usagePercent
;
private
long
timeBucket
;
public
CpuMetric
(
String
id
,
int
applicationInstanceId
,
double
usagePercent
,
long
timeBucket
)
{
this
.
id
=
id
;
this
.
applicationInstanceId
=
applicationInstanceId
;
this
.
usagePercent
=
usagePercent
;
this
.
timeBucket
=
timeBucket
;
}
public
CpuMetric
()
{
}
@Override
public
Data
toData
()
{
CpuMetricDataDefine
define
=
new
CpuMetricDataDefine
();
Data
data
=
define
.
build
(
id
);
data
.
setDataString
(
0
,
this
.
id
);
data
.
setDataInteger
(
0
,
this
.
applicationInstanceId
);
data
.
setDataDouble
(
0
,
this
.
usagePercent
);
data
.
setDataLong
(
0
,
this
.
timeBucket
);
return
data
;
}
@Override
public
CpuMetric
toSelf
(
Data
data
)
{
this
.
id
=
data
.
getDataString
(
0
);
this
.
applicationInstanceId
=
data
.
getDataInteger
(
0
);
this
.
usagePercent
=
data
.
getDataDouble
(
0
);
this
.
timeBucket
=
data
.
getDataLong
(
0
);
return
this
;
}
public
void
setId
(
String
id
)
{
this
.
id
=
id
;
}
public
void
setApplicationInstanceId
(
int
applicationInstanceId
)
{
this
.
applicationInstanceId
=
applicationInstanceId
;
}
public
void
setUsagePercent
(
double
usagePercent
)
{
this
.
usagePercent
=
usagePercent
;
}
public
void
setTimeBucket
(
long
timeBucket
)
{
this
.
timeBucket
=
timeBucket
;
}
public
String
getId
()
{
return
id
;
}
public
int
getApplicationInstanceId
()
{
return
applicationInstanceId
;
}
public
double
getUsagePercent
()
{
return
usagePercent
;
}
public
long
getTimeBucket
()
{
return
timeBucket
;
}
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricEsTableDefine.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine
;
import
org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine
;
/**
* @author pengys5
*/
public
class
CpuMetricEsTableDefine
extends
ElasticSearchTableDefine
{
public
CpuMetricEsTableDefine
()
{
super
(
CpuMetricTable
.
TABLE
);
}
@Override
public
int
refreshInterval
()
{
return
1
;
}
@Override
public
int
numberOfShards
()
{
return
2
;
}
@Override
public
int
numberOfReplicas
()
{
return
0
;
}
@Override
public
void
initialize
()
{
addColumn
(
new
ElasticSearchColumnDefine
(
CpuMetricTable
.
COLUMN_APPLICATION_INSTANCE_ID
,
ElasticSearchColumnDefine
.
Type
.
Integer
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
CpuMetricTable
.
COLUMN_USAGE_PERCENT
,
ElasticSearchColumnDefine
.
Type
.
Double
.
name
()));
addColumn
(
new
ElasticSearchColumnDefine
(
CpuMetricTable
.
COLUMN_TIME_BUCKET
,
ElasticSearchColumnDefine
.
Type
.
Long
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricH2TableDefine.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define
;
import
org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine
;
import
org.skywalking.apm.collector.storage.h2.define.H2TableDefine
;
/**
* @author pengys5
*/
public
class
CpuMetricH2TableDefine
extends
H2TableDefine
{
public
CpuMetricH2TableDefine
()
{
super
(
CpuMetricTable
.
TABLE
);
}
@Override
public
void
initialize
()
{
addColumn
(
new
H2ColumnDefine
(
CpuMetricTable
.
COLUMN_ID
,
H2ColumnDefine
.
Type
.
Varchar
.
name
()));
addColumn
(
new
H2ColumnDefine
(
CpuMetricTable
.
COLUMN_APPLICATION_INSTANCE_ID
,
H2ColumnDefine
.
Type
.
Int
.
name
()));
addColumn
(
new
H2ColumnDefine
(
CpuMetricTable
.
COLUMN_USAGE_PERCENT
,
H2ColumnDefine
.
Type
.
Double
.
name
()));
addColumn
(
new
H2ColumnDefine
(
CpuMetricTable
.
COLUMN_TIME_BUCKET
,
H2ColumnDefine
.
Type
.
Bigint
.
name
()));
}
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/jvmmetric/cpu/define/CpuMetricTable.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define
;
import
org.skywalking.apm.collector.agentstream.worker.CommonTable
;
/**
* @author pengys5
*/
public
class
CpuMetricTable
extends
CommonTable
{
public
static
final
String
TABLE
=
"cpu_metric"
;
public
static
final
String
COLUMN_APPLICATION_INSTANCE_ID
=
"application_instance_id"
;
public
static
final
String
COLUMN_USAGE_PERCENT
=
"application_instance_id"
;
}
apm-collector/apm-collector-agentstream/src/main/java/org/skywalking/apm/collector/agentstream/worker/util/TimeBucketUtils.java
浏览文件 @
a798bb64
...
...
@@ -13,6 +13,7 @@ public enum TimeBucketUtils {
private
final
SimpleDateFormat
dayDateFormat
=
new
SimpleDateFormat
(
"yyyyMMdd"
);
private
final
SimpleDateFormat
hourDateFormat
=
new
SimpleDateFormat
(
"yyyyMMddHH"
);
private
final
SimpleDateFormat
minuteDateFormat
=
new
SimpleDateFormat
(
"yyyyMMddHHmm"
);
private
final
SimpleDateFormat
secondDateFormat
=
new
SimpleDateFormat
(
"yyyyMMddHHmmss"
);
public
long
getMinuteTimeBucket
(
long
time
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
...
...
@@ -21,6 +22,13 @@ public enum TimeBucketUtils {
return
Long
.
valueOf
(
timeStr
);
}
public
long
getSecondTimeBucket
(
long
time
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
time
);
String
timeStr
=
secondDateFormat
.
format
(
calendar
.
getTime
());
return
Long
.
valueOf
(
timeStr
);
}
public
long
getHourTimeBucket
(
long
time
)
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
time
);
...
...
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/es_dao.define
浏览文件 @
a798bb64
...
...
@@ -9,4 +9,5 @@ org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumEs
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostEsDAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceEsDAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryEsDAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefEsDAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricEsDAO
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/h2_dao.define
浏览文件 @
a798bb64
...
...
@@ -9,4 +9,5 @@ org.skywalking.apm.collector.agentstream.worker.noderef.summary.dao.NodeRefSumH2
org.skywalking.apm.collector.agentstream.worker.segment.cost.dao.SegmentCostH2DAO
org.skywalking.apm.collector.agentstream.worker.global.dao.GlobalTraceH2DAO
org.skywalking.apm.collector.agentstream.worker.service.entry.dao.ServiceEntryH2DAO
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.dao.ServiceRefH2DAO
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.dao.CpuMetricH2DAO
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/local_worker_provider.define
浏览文件 @
a798bb64
...
...
@@ -20,6 +20,8 @@ org.skywalking.apm.collector.agentstream.worker.segment.origin.SegmentPersistenc
org.skywalking.apm.collector.agentstream.worker.segment.cost.SegmentCostPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.global.GlobalTracePersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.CpuMetricPersistenceWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.servicename.ServiceNameRegisterSerialWorker$Factory
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/main/resources/META-INF/defines/storage.define
浏览文件 @
a798bb64
...
...
@@ -32,4 +32,7 @@ org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntr
org.skywalking.apm.collector.agentstream.worker.service.entry.define.ServiceEntryH2TableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefEsTableDefine
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.serviceref.reference.define.ServiceRefH2TableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricEsTableDefine
org.skywalking.apm.collector.agentstream.worker.jvmmetric.cpu.define.CpuMetricH2TableDefine
\ No newline at end of file
apm-collector/apm-collector-agentstream/src/test/java/org/skywalking/apm/collector/agentstream/util/TimeBucketUtilsTestCase.java
0 → 100644
浏览文件 @
a798bb64
package
org.skywalking.apm.collector.agentstream.util
;
import
java.util.Calendar
;
import
java.util.TimeZone
;
import
org.junit.Assert
;
import
org.junit.Test
;
import
org.skywalking.apm.collector.agentstream.worker.util.TimeBucketUtils
;
/**
* @author pengys5
*/
public
class
TimeBucketUtilsTestCase
{
@Test
public
void
testUTCLocation
()
{
TimeZone
.
setDefault
(
TimeZone
.
getTimeZone
(
"UTC"
));
long
timeBucket
=
201703310915L
;
long
changedTimeBucket
=
TimeBucketUtils
.
INSTANCE
.
changeToUTCTimeBucket
(
timeBucket
);
Assert
.
assertEquals
(
201703310115L
,
changedTimeBucket
);
}
@Test
public
void
testUTC8Location
()
{
TimeZone
.
setDefault
(
TimeZone
.
getTimeZone
(
"GMT+08:00"
));
long
timeBucket
=
201703310915L
;
long
changedTimeBucket
=
TimeBucketUtils
.
INSTANCE
.
changeToUTCTimeBucket
(
timeBucket
);
Assert
.
assertEquals
(
201703310915L
,
changedTimeBucket
);
}
@Test
public
void
testGetSecondTimeBucket
()
{
long
timeBucket
=
TimeBucketUtils
.
INSTANCE
.
getSecondTimeBucket
(
1490922929258L
);
Assert
.
assertEquals
(
20170331091529L
,
timeBucket
);
}
@Test
public
void
test
()
{
Calendar
calendar
=
Calendar
.
getInstance
();
calendar
.
setTimeInMillis
(
1490922929258L
);
calendar
.
set
(
Calendar
.
SECOND
,
calendar
.
get
(
Calendar
.
SECOND
)
-
3
);
// System.out.println(calendar.getTimeInMillis());
calendar
.
set
(
Calendar
.
SECOND
,
calendar
.
get
(
Calendar
.
SECOND
)
-
2
);
// System.out.println(calendar.getTimeInMillis());
calendar
.
set
(
Calendar
.
SECOND
,
calendar
.
get
(
Calendar
.
SECOND
)
-
2
);
// System.out.println(calendar.getTimeInMillis());
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/elasticsearch/define/ElasticSearchColumnDefine.java
浏览文件 @
a798bb64
...
...
@@ -11,6 +11,6 @@ public class ElasticSearchColumnDefine extends ColumnDefine {
}
public
enum
Type
{
Binary
,
Boolean
,
Date
,
Keyword
,
Long
,
Integer
Binary
,
Boolean
,
Keyword
,
Long
,
Integer
,
Double
}
}
apm-collector/apm-collector-storage/src/main/java/org/skywalking/apm/collector/storage/h2/define/H2ColumnDefine.java
浏览文件 @
a798bb64
...
...
@@ -12,6 +12,6 @@ public class H2ColumnDefine extends ColumnDefine {
}
public
enum
Type
{
Boolean
,
Varchar
,
Int
,
Bigint
,
BINARY
Boolean
,
Varchar
,
Int
,
Bigint
,
BINARY
,
Double
}
}
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/AttributeType.java
浏览文件 @
a798bb64
...
...
@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
* @author pengys5
*/
public
enum
AttributeType
{
STRING
,
LONG
,
FLOAT
,
INTEGER
,
BYTE
,
BOOLEAN
STRING
,
LONG
,
DOUBLE
,
INTEGER
,
BYTE
,
BOOLEAN
}
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Data.java
浏览文件 @
a798bb64
...
...
@@ -10,29 +10,29 @@ import org.skywalking.apm.collector.stream.worker.selector.AbstractHashMessage;
public
class
Data
extends
AbstractHashMessage
{
private
final
int
stringCapacity
;
private
final
int
longCapacity
;
private
final
int
float
Capacity
;
private
final
int
double
Capacity
;
private
final
int
integerCapacity
;
private
final
int
booleanCapacity
;
private
final
int
byteCapacity
;
private
String
[]
dataStrings
;
private
Long
[]
dataLongs
;
private
Float
[]
dataFloat
s
;
private
Double
[]
dataDouble
s
;
private
Integer
[]
dataIntegers
;
private
Boolean
[]
dataBooleans
;
private
byte
[][]
dataBytes
;
public
Data
(
String
id
,
int
stringCapacity
,
int
longCapacity
,
int
float
Capacity
,
int
integerCapacity
,
public
Data
(
String
id
,
int
stringCapacity
,
int
longCapacity
,
int
double
Capacity
,
int
integerCapacity
,
int
booleanCapacity
,
int
byteCapacity
)
{
super
(
id
);
this
.
dataStrings
=
new
String
[
stringCapacity
];
this
.
dataLongs
=
new
Long
[
longCapacity
];
this
.
data
Floats
=
new
Float
[
float
Capacity
];
this
.
data
Doubles
=
new
Double
[
double
Capacity
];
this
.
dataIntegers
=
new
Integer
[
integerCapacity
];
this
.
dataBooleans
=
new
Boolean
[
booleanCapacity
];
this
.
dataBytes
=
new
byte
[
byteCapacity
][];
this
.
stringCapacity
=
stringCapacity
;
this
.
longCapacity
=
longCapacity
;
this
.
floatCapacity
=
float
Capacity
;
this
.
doubleCapacity
=
double
Capacity
;
this
.
integerCapacity
=
integerCapacity
;
this
.
booleanCapacity
=
booleanCapacity
;
this
.
byteCapacity
=
byteCapacity
;
...
...
@@ -46,8 +46,8 @@ public class Data extends AbstractHashMessage {
dataLongs
[
position
]
=
value
;
}
public
void
setData
Float
(
int
position
,
Float
value
)
{
data
Float
s
[
position
]
=
value
;
public
void
setData
Double
(
int
position
,
Double
value
)
{
data
Double
s
[
position
]
=
value
;
}
public
void
setDataInteger
(
int
position
,
Integer
value
)
{
...
...
@@ -70,8 +70,8 @@ public class Data extends AbstractHashMessage {
return
dataLongs
[
position
];
}
public
Float
getDataFloat
(
int
position
)
{
return
data
Float
s
[
position
];
public
Double
getDataDouble
(
int
position
)
{
return
data
Double
s
[
position
];
}
public
Integer
getDataInteger
(
int
position
)
{
...
...
@@ -93,7 +93,7 @@ public class Data extends AbstractHashMessage {
public
RemoteData
serialize
()
{
RemoteData
.
Builder
builder
=
RemoteData
.
newBuilder
();
builder
.
setIntegerCapacity
(
integerCapacity
);
builder
.
set
FloatCapacity
(
float
Capacity
);
builder
.
set
DoubleCapacity
(
double
Capacity
);
builder
.
setStringCapacity
(
stringCapacity
);
builder
.
setLongCapacity
(
longCapacity
);
builder
.
setByteCapacity
(
byteCapacity
);
...
...
@@ -105,8 +105,8 @@ public class Data extends AbstractHashMessage {
for
(
int
i
=
0
;
i
<
dataIntegers
.
length
;
i
++)
{
builder
.
setDataIntegers
(
i
,
dataIntegers
[
i
]);
}
for
(
int
i
=
0
;
i
<
data
Float
s
.
length
;
i
++)
{
builder
.
setData
Floats
(
i
,
dataFloat
s
[
i
]);
for
(
int
i
=
0
;
i
<
data
Double
s
.
length
;
i
++)
{
builder
.
setData
Doubles
(
i
,
dataDouble
s
[
i
]);
}
for
(
int
i
=
0
;
i
<
dataLongs
.
length
;
i
++)
{
builder
.
setDataLongs
(
i
,
dataLongs
[
i
]);
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/DataDefine.java
浏览文件 @
a798bb64
...
...
@@ -9,7 +9,7 @@ public abstract class DataDefine {
private
Attribute
[]
attributes
;
private
int
stringCapacity
;
private
int
longCapacity
;
private
int
float
Capacity
;
private
int
double
Capacity
;
private
int
integerCapacity
;
private
int
booleanCapacity
;
private
int
byteCapacity
;
...
...
@@ -26,8 +26,8 @@ public abstract class DataDefine {
stringCapacity
++;
}
else
if
(
AttributeType
.
LONG
.
equals
(
attribute
.
getType
()))
{
longCapacity
++;
}
else
if
(
AttributeType
.
FLOAT
.
equals
(
attribute
.
getType
()))
{
float
Capacity
++;
}
else
if
(
AttributeType
.
DOUBLE
.
equals
(
attribute
.
getType
()))
{
double
Capacity
++;
}
else
if
(
AttributeType
.
INTEGER
.
equals
(
attribute
.
getType
()))
{
integerCapacity
++;
}
else
if
(
AttributeType
.
BOOLEAN
.
equals
(
attribute
.
getType
()))
{
...
...
@@ -47,13 +47,13 @@ public abstract class DataDefine {
protected
abstract
void
attributeDefine
();
public
final
Data
build
(
String
id
)
{
return
new
Data
(
id
,
stringCapacity
,
longCapacity
,
float
Capacity
,
integerCapacity
,
booleanCapacity
,
byteCapacity
);
return
new
Data
(
id
,
stringCapacity
,
longCapacity
,
double
Capacity
,
integerCapacity
,
booleanCapacity
,
byteCapacity
);
}
public
void
mergeData
(
Data
newData
,
Data
oldData
)
{
int
stringPosition
=
0
;
int
longPosition
=
0
;
int
float
Position
=
0
;
int
double
Position
=
0
;
int
integerPosition
=
0
;
int
booleanPosition
=
0
;
int
bytePosition
=
0
;
...
...
@@ -65,9 +65,9 @@ public abstract class DataDefine {
}
else
if
(
AttributeType
.
LONG
.
equals
(
attribute
.
getType
()))
{
attribute
.
getOperation
().
operate
(
newData
.
getDataLong
(
longPosition
),
oldData
.
getDataLong
(
longPosition
));
longPosition
++;
}
else
if
(
AttributeType
.
FLOAT
.
equals
(
attribute
.
getType
()))
{
attribute
.
getOperation
().
operate
(
newData
.
getData
Float
(
floatPosition
),
oldData
.
getDataFloat
(
float
Position
));
float
Position
++;
}
else
if
(
AttributeType
.
DOUBLE
.
equals
(
attribute
.
getType
()))
{
attribute
.
getOperation
().
operate
(
newData
.
getData
Double
(
doublePosition
),
oldData
.
getDataDouble
(
double
Position
));
double
Position
++;
}
else
if
(
AttributeType
.
INTEGER
.
equals
(
attribute
.
getType
()))
{
attribute
.
getOperation
().
operate
(
newData
.
getDataInteger
(
integerPosition
),
oldData
.
getDataInteger
(
integerPosition
));
integerPosition
++;
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/Operation.java
浏览文件 @
a798bb64
...
...
@@ -8,7 +8,7 @@ public interface Operation {
Long
operate
(
Long
newValue
,
Long
oldValue
);
Float
operate
(
Float
newValue
,
Float
oldValue
);
Double
operate
(
Double
newValue
,
Double
oldValue
);
Integer
operate
(
Integer
newValue
,
Integer
oldValue
);
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/AddOperation.java
浏览文件 @
a798bb64
...
...
@@ -15,7 +15,7 @@ public class AddOperation implements Operation {
return
newValue
+
oldValue
;
}
@Override
public
Float
operate
(
Float
newValue
,
Float
oldValue
)
{
@Override
public
Double
operate
(
Double
newValue
,
Double
oldValue
)
{
return
newValue
+
oldValue
;
}
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/CoverOperation.java
浏览文件 @
a798bb64
...
...
@@ -14,7 +14,7 @@ public class CoverOperation implements Operation {
return
newValue
;
}
@Override
public
Float
operate
(
Float
newValue
,
Float
oldValue
)
{
@Override
public
Double
operate
(
Double
newValue
,
Double
oldValue
)
{
return
newValue
;
}
...
...
apm-collector/apm-collector-stream/src/main/java/org/skywalking/apm/collector/stream/worker/impl/data/operate/NonOperation.java
浏览文件 @
a798bb64
...
...
@@ -14,7 +14,7 @@ public class NonOperation implements Operation {
return
oldValue
;
}
@Override
public
Float
operate
(
Float
newValue
,
Float
oldValue
)
{
@Override
public
Double
operate
(
Double
newValue
,
Double
oldValue
)
{
return
oldValue
;
}
...
...
apm-collector/apm-collector-stream/src/main/proto/RemoteCommonService.proto
浏览文件 @
a798bb64
...
...
@@ -16,13 +16,13 @@ message RemoteMessage {
message
RemoteData
{
int32
stringCapacity
=
1
;
int32
longCapacity
=
2
;
int32
float
Capacity
=
3
;
int32
double
Capacity
=
3
;
int32
integerCapacity
=
4
;
int32
byteCapacity
=
5
;
int32
booleanCapacity
=
6
;
repeated
string
dataStrings
=
7
;
repeated
int64
dataLongs
=
8
;
repeated
float
dataFloat
s
=
9
;
repeated
double
dataDouble
s
=
9
;
repeated
int32
dataIntegers
=
10
;
repeated
bytes
dataBytes
=
11
;
repeated
bool
dataBooleans
=
12
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录