Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
SkyWalking
提交
db610152
S
SkyWalking
项目概览
apache
/
SkyWalking
上一次同步 1 年多
通知
302
Star
21345
Fork
6091
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
db610152
编写于
1月 21, 2021
作者:
Z
zifeihan
提交者:
GitHub
1月 21, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Support dynamic change SAMPLE_N_PER_3_SECS on java agent (#6232)
上级
15204781
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
544 addition
and
30 deletion
+544
-30
CHANGES.md
CHANGES.md
+1
-0
apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ConfigurationDiscoveryCommand.java
...race/component/command/ConfigurationDiscoveryCommand.java
+1
-1
apm-protocol/apm-network/src/main/proto
apm-protocol/apm-network/src/main/proto
+1
-1
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java
...lking/apm/agent/core/commands/CommandExecutorService.java
+5
-0
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ConfigurationDiscoveryCommandExecutor.java
...mands/executor/ConfigurationDiscoveryCommandExecutor.java
+45
-0
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
...ava/org/apache/skywalking/apm/agent/core/conf/Config.java
+4
-0
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/dynamic/AgentConfigChangeWatcher.java
...apm/agent/core/conf/dynamic/AgentConfigChangeWatcher.java
+62
-0
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/dynamic/ConfigurationDiscoveryService.java
...gent/core/conf/dynamic/ConfigurationDiscoveryService.java
+240
-0
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingRateWatcher.java
...ywalking/apm/agent/core/sampling/SamplingRateWatcher.java
+76
-0
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.java
...e/skywalking/apm/agent/core/sampling/SamplingService.java
+38
-20
apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
...ces/org.apache.skywalking.apm.agent.core.boot.BootService
+2
-1
apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
...he/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+2
-2
apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/sampling/SamplingRateWatcherTest.java
...king/apm/agent/core/sampling/SamplingRateWatcherTest.java
+57
-0
apm-sniffer/optional-plugins/trace-ignore-plugin/src/main/java/org/apache/skywalking/apm/plugin/trace/ignore/TraceIgnoreExtendService.java
...ing/apm/plugin/trace/ignore/TraceIgnoreExtendService.java
+1
-0
apm-sniffer/optional-plugins/trace-ignore-plugin/src/test/java/org/apache/skywalking/apm/plugin/trace/ignore/TraceIgnoreTest.java
...e/skywalking/apm/plugin/trace/ignore/TraceIgnoreTest.java
+5
-2
docs/en/setup/service-agent/java-agent/README.md
docs/en/setup/service-agent/java-agent/README.md
+1
-0
oap-server/server-bootstrap/src/main/resources/application.yml
...erver/server-bootstrap/src/main/resources/application.yml
+1
-1
oap-server/server-receiver-plugin/configuration-discovery-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/configuration/discovery/handler/grpc/ConfigurationDiscoveryServiceHandler.java
...ry/handler/grpc/ConfigurationDiscoveryServiceHandler.java
+2
-2
未找到文件。
CHANGES.md
浏览文件 @
db610152
...
...
@@ -36,6 +36,7 @@ Release Notes.
*
Support reading segmentId and spanId with toolkit.
*
Fix RestTemplate plugin recording url tag with wrong port
*
Support collecting logs and forwarding through gRPC.
*
Support config
`agent.sample_n_per_3_secs`
can be changed in the runtime.
#### OAP-Backend
*
Make meter receiver support MAL.
...
...
apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ConfigurationDiscoveryCommand.java
浏览文件 @
db610152
...
...
@@ -26,7 +26,7 @@ import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
public
class
ConfigurationDiscoveryCommand
extends
BaseCommand
implements
Serializable
,
Deserializable
<
ConfigurationDiscoveryCommand
>
{
public
static
final
Deserializable
<
ConfigurationDiscoveryCommand
>
DESERIALIZER
=
new
ConfigurationDiscoveryCommand
(
""
,
""
,
new
ArrayList
<>());
public
static
final
String
NAME
=
ConfigurationDiscoveryCommand
.
class
.
getSimpleName
()
;
public
static
final
String
NAME
=
"ConfigurationDiscoveryCommand"
;
public
static
final
String
UUID_CONST_NAME
=
"UUID"
;
public
static
final
String
SERIAL_NUMBER_CONST_NAME
=
"SerialNumber"
;
...
...
proto
@
ce9e4e8b
比较
101dc504
...
ce9e4e8b
Subproject commit
101dc50429c98147b1109cb15c8a6c623e751759
Subproject commit
ce9e4e8bd9e552443cc970df67ee25f17ff0d3b8
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/CommandExecutorService.java
浏览文件 @
db610152
...
...
@@ -21,9 +21,11 @@ import java.util.HashMap;
import
java.util.Map
;
import
org.apache.skywalking.apm.agent.core.boot.BootService
;
import
org.apache.skywalking.apm.agent.core.boot.DefaultImplementor
;
import
org.apache.skywalking.apm.agent.core.commands.executor.ConfigurationDiscoveryCommandExecutor
;
import
org.apache.skywalking.apm.agent.core.commands.executor.NoopCommandExecutor
;
import
org.apache.skywalking.apm.agent.core.commands.executor.ProfileTaskCommandExecutor
;
import
org.apache.skywalking.apm.network.trace.component.command.BaseCommand
;
import
org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand
;
import
org.apache.skywalking.apm.network.trace.component.command.ProfileTaskCommand
;
/**
...
...
@@ -43,6 +45,9 @@ public class CommandExecutorService implements BootService, CommandExecutor {
// Profile task executor
commandExecutorMap
.
put
(
ProfileTaskCommand
.
NAME
,
new
ProfileTaskCommandExecutor
());
//Get ConfigurationDiscoveryCommand executor.
commandExecutorMap
.
put
(
ConfigurationDiscoveryCommand
.
NAME
,
new
ConfigurationDiscoveryCommandExecutor
());
}
@Override
...
...
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ConfigurationDiscoveryCommandExecutor.java
0 → 100644
浏览文件 @
db610152
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.agent.core.commands.executor
;
import
org.apache.skywalking.apm.agent.core.boot.ServiceManager
;
import
org.apache.skywalking.apm.agent.core.commands.CommandExecutionException
;
import
org.apache.skywalking.apm.agent.core.commands.CommandExecutor
;
import
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
;
import
org.apache.skywalking.apm.agent.core.logging.api.ILog
;
import
org.apache.skywalking.apm.agent.core.logging.api.LogManager
;
import
org.apache.skywalking.apm.network.trace.component.command.BaseCommand
;
import
org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand
;
public
class
ConfigurationDiscoveryCommandExecutor
implements
CommandExecutor
{
private
static
final
ILog
LOGGER
=
LogManager
.
getLogger
(
ConfigurationDiscoveryCommandExecutor
.
class
);
@Override
public
void
execute
(
BaseCommand
command
)
throws
CommandExecutionException
{
try
{
ConfigurationDiscoveryCommand
agentDynamicConfigurationCommand
=
(
ConfigurationDiscoveryCommand
)
command
;
ServiceManager
.
INSTANCE
.
findService
(
ConfigurationDiscoveryService
.
class
)
.
handleConfigurationDiscoveryCommand
(
agentDynamicConfigurationCommand
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
e
,
"Handle ConfigurationDiscoveryCommand error, command:{}"
,
command
.
toString
());
}
}
}
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
浏览文件 @
db610152
...
...
@@ -164,6 +164,10 @@ public class Config {
*/
public
static
int
GET_PROFILE_TASK_INTERVAL
=
20
;
/**
* Get agent dynamic config interval
*/
public
static
int
GET_AGENT_DYNAMIC_CONFIG_INTERVAL
=
20
;
}
public
static
class
Profile
{
...
...
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/dynamic/AgentConfigChangeWatcher.java
0 → 100644
浏览文件 @
db610152
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.agent.core.conf.dynamic
;
import
lombok.Getter
;
import
lombok.RequiredArgsConstructor
;
@Getter
public
abstract
class
AgentConfigChangeWatcher
{
// Config key, should match KEY in the Table of Agent Configuration Properties.
private
final
String
propertyKey
;
public
AgentConfigChangeWatcher
(
String
propertyKey
)
{
this
.
propertyKey
=
propertyKey
;
}
/**
* Notify the watcher, the new value received.
*
* @param value of new.
*/
public
abstract
void
notify
(
ConfigChangeEvent
value
);
/**
* @return current value of current config.
*/
public
abstract
String
value
();
@Override
public
String
toString
()
{
return
"AgentConfigChangeWatcher{"
+
"propertyKey='"
+
propertyKey
+
'\''
+
'}'
;
}
@Getter
@RequiredArgsConstructor
public
static
class
ConfigChangeEvent
{
private
final
String
newValue
;
private
final
EventType
eventType
;
}
public
enum
EventType
{
ADD
,
MODIFY
,
DELETE
}
}
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/dynamic/ConfigurationDiscoveryService.java
0 → 100644
浏览文件 @
db610152
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.agent.core.conf.dynamic
;
import
io.grpc.Channel
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledFuture
;
import
java.util.concurrent.TimeUnit
;
import
java.util.stream.Collectors
;
import
lombok.Getter
;
import
org.apache.skywalking.apm.agent.core.boot.BootService
;
import
org.apache.skywalking.apm.agent.core.boot.DefaultImplementor
;
import
org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory
;
import
org.apache.skywalking.apm.agent.core.boot.ServiceManager
;
import
org.apache.skywalking.apm.agent.core.commands.CommandService
;
import
org.apache.skywalking.apm.agent.core.conf.Config
;
import
org.apache.skywalking.apm.agent.core.logging.api.ILog
;
import
org.apache.skywalking.apm.agent.core.logging.api.LogManager
;
import
org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener
;
import
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
;
import
org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus
;
import
org.apache.skywalking.apm.network.language.agent.v3.ConfigurationDiscoveryServiceGrpc
;
import
org.apache.skywalking.apm.network.language.agent.v3.ConfigurationSyncRequest
;
import
org.apache.skywalking.apm.network.common.v3.Commands
;
import
org.apache.skywalking.apm.network.common.v3.KeyStringValuePair
;
import
org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand
;
import
org.apache.skywalking.apm.util.RunnableWithExceptionProtection
;
import
org.apache.skywalking.apm.util.StringUtil
;
import
static
org
.
apache
.
skywalking
.
apm
.
agent
.
core
.
conf
.
Config
.
Collector
.
GRPC_UPSTREAM_TIMEOUT
;
@DefaultImplementor
public
class
ConfigurationDiscoveryService
implements
BootService
,
GRPCChannelListener
{
/**
* UUID of the last return value.
*/
private
String
uuid
;
private
final
Register
register
=
new
Register
();
private
volatile
ScheduledFuture
<?>
getDynamicConfigurationFuture
;
private
volatile
GRPCChannelStatus
status
=
GRPCChannelStatus
.
DISCONNECT
;
private
volatile
ConfigurationDiscoveryServiceGrpc
.
ConfigurationDiscoveryServiceBlockingStub
configurationDiscoveryServiceBlockingStub
;
private
static
final
ILog
LOGGER
=
LogManager
.
getLogger
(
ConfigurationDiscoveryService
.
class
);
@Override
public
void
statusChanged
(
final
GRPCChannelStatus
status
)
{
if
(
GRPCChannelStatus
.
CONNECTED
.
equals
(
status
))
{
Channel
channel
=
ServiceManager
.
INSTANCE
.
findService
(
GRPCChannelManager
.
class
).
getChannel
();
configurationDiscoveryServiceBlockingStub
=
ConfigurationDiscoveryServiceGrpc
.
newBlockingStub
(
channel
);
}
else
{
configurationDiscoveryServiceBlockingStub
=
null
;
}
this
.
status
=
status
;
}
@Override
public
void
prepare
()
throws
Throwable
{
ServiceManager
.
INSTANCE
.
findService
(
GRPCChannelManager
.
class
).
addChannelListener
(
this
);
}
@Override
public
void
boot
()
throws
Throwable
{
getDynamicConfigurationFuture
=
Executors
.
newSingleThreadScheduledExecutor
(
new
DefaultNamedThreadFactory
(
"ConfigurationDiscoveryService"
)
).
scheduleAtFixedRate
(
new
RunnableWithExceptionProtection
(
this
::
getAgentDynamicConfig
,
t
->
LOGGER
.
error
(
"Sync config from OAP error."
,
t
)
),
Config
.
Collector
.
GET_AGENT_DYNAMIC_CONFIG_INTERVAL
,
Config
.
Collector
.
GET_AGENT_DYNAMIC_CONFIG_INTERVAL
,
TimeUnit
.
SECONDS
);
}
@Override
public
void
onComplete
()
throws
Throwable
{
}
@Override
public
void
shutdown
()
throws
Throwable
{
if
(
getDynamicConfigurationFuture
!=
null
)
{
getDynamicConfigurationFuture
.
cancel
(
true
);
}
}
/**
* Register dynamic configuration watcher.
*
* @param watcher dynamic configuration watcher
*/
public
void
registerAgentConfigChangeWatcher
(
AgentConfigChangeWatcher
watcher
)
{
WatcherHolder
holder
=
new
WatcherHolder
(
watcher
);
if
(
register
.
containsKey
(
holder
.
getKey
()))
{
throw
new
IllegalStateException
(
"Duplicate register, watcher="
+
watcher
);
}
register
.
put
(
holder
.
getKey
(),
holder
);
}
/**
* Process ConfigurationDiscoveryCommand and notify each configuration watcher.
*
* @param configurationDiscoveryCommand Describe dynamic configuration information
*/
public
void
handleConfigurationDiscoveryCommand
(
ConfigurationDiscoveryCommand
configurationDiscoveryCommand
)
{
final
String
responseUuid
=
configurationDiscoveryCommand
.
getUuid
();
final
List
<
KeyStringValuePair
>
config
=
configurationDiscoveryCommand
.
getConfig
();
if
(
responseUuid
!=
null
&&
Objects
.
equals
(
this
.
uuid
,
responseUuid
))
{
return
;
}
config
.
forEach
(
property
->
{
String
propertyKey
=
property
.
getKey
();
WatcherHolder
holder
=
register
.
get
(
propertyKey
);
if
(
holder
!=
null
)
{
AgentConfigChangeWatcher
watcher
=
holder
.
getWatcher
();
String
newPropertyValue
=
property
.
getValue
();
if
(
StringUtil
.
isBlank
(
newPropertyValue
))
{
if
(
watcher
.
value
()
!=
null
)
{
// Notify watcher, the new value is null with delete event type.
watcher
.
notify
(
new
AgentConfigChangeWatcher
.
ConfigChangeEvent
(
null
,
AgentConfigChangeWatcher
.
EventType
.
DELETE
));
}
else
{
// Don't need to notify, stay in null.
}
}
else
{
if
(!
newPropertyValue
.
equals
(
watcher
.
value
()))
{
watcher
.
notify
(
new
AgentConfigChangeWatcher
.
ConfigChangeEvent
(
newPropertyValue
,
AgentConfigChangeWatcher
.
EventType
.
MODIFY
));
}
else
{
// Don't need to notify, stay in the same config value.
}
}
}
else
{
LOGGER
.
warn
(
"Config {} from OAP, doesn't match any watcher, ignore."
,
propertyKey
);
}
});
this
.
uuid
=
responseUuid
;
LOGGER
.
trace
(
"Current configurations after the sync, configurations:{}"
,
register
.
toString
());
}
/**
* get agent dynamic config through gRPC.
*/
private
void
getAgentDynamicConfig
()
{
LOGGER
.
debug
(
"ConfigurationDiscoveryService running, status:{}."
,
status
);
if
(
GRPCChannelStatus
.
CONNECTED
.
equals
(
status
))
{
try
{
ConfigurationSyncRequest
.
Builder
builder
=
ConfigurationSyncRequest
.
newBuilder
();
builder
.
setService
(
Config
.
Agent
.
SERVICE_NAME
);
if
(
null
!=
uuid
)
{
builder
.
setUuid
(
uuid
);
}
if
(
configurationDiscoveryServiceBlockingStub
!=
null
)
{
final
Commands
commands
=
configurationDiscoveryServiceBlockingStub
.
withDeadlineAfter
(
GRPC_UPSTREAM_TIMEOUT
,
TimeUnit
.
SECONDS
).
fetchConfigurations
(
builder
.
build
());
ServiceManager
.
INSTANCE
.
findService
(
CommandService
.
class
).
receiveCommand
(
commands
);
}
}
catch
(
Throwable
t
)
{
LOGGER
.
error
(
t
,
"ConfigurationDiscoveryService execute fail."
);
ServiceManager
.
INSTANCE
.
findService
(
GRPCChannelManager
.
class
).
reportError
(
t
);
}
}
}
/**
* Local dynamic configuration center.
*/
public
static
class
Register
{
private
final
Map
<
String
,
WatcherHolder
>
register
=
new
HashMap
<>();
private
boolean
containsKey
(
String
key
)
{
return
register
.
containsKey
(
key
);
}
private
void
put
(
String
key
,
WatcherHolder
holder
)
{
register
.
put
(
key
,
holder
);
}
public
WatcherHolder
get
(
String
name
)
{
return
register
.
get
(
name
);
}
@Override
public
String
toString
()
{
ArrayList
<
String
>
registerTableDescription
=
new
ArrayList
<>(
register
.
size
());
register
.
forEach
((
key
,
holder
)
->
{
AgentConfigChangeWatcher
watcher
=
holder
.
getWatcher
();
registerTableDescription
.
add
(
new
StringBuilder
().
append
(
"key:"
)
.
append
(
key
)
.
append
(
"value(current):"
)
.
append
(
watcher
.
value
()).
toString
());
});
return
registerTableDescription
.
stream
().
collect
(
Collectors
.
joining
(
","
,
"["
,
"]"
));
}
}
@Getter
private
static
class
WatcherHolder
{
private
final
AgentConfigChangeWatcher
watcher
;
private
final
String
key
;
public
WatcherHolder
(
AgentConfigChangeWatcher
watcher
)
{
this
.
watcher
=
watcher
;
this
.
key
=
watcher
.
getPropertyKey
();
}
}
}
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingRateWatcher.java
0 → 100644
浏览文件 @
db610152
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.agent.core.sampling
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
org.apache.skywalking.apm.agent.core.conf.Config
;
import
org.apache.skywalking.apm.agent.core.conf.dynamic.AgentConfigChangeWatcher
;
import
org.apache.skywalking.apm.agent.core.logging.api.ILog
;
import
org.apache.skywalking.apm.agent.core.logging.api.LogManager
;
public
class
SamplingRateWatcher
extends
AgentConfigChangeWatcher
{
private
static
final
ILog
LOGGER
=
LogManager
.
getLogger
(
SamplingRateWatcher
.
class
);
private
final
AtomicInteger
samplingRate
;
private
final
SamplingService
samplingService
;
public
SamplingRateWatcher
(
final
String
propertyKey
,
SamplingService
samplingService
)
{
super
(
propertyKey
);
this
.
samplingRate
=
new
AtomicInteger
(
getDefaultValue
());
this
.
samplingService
=
samplingService
;
}
private
void
activeSetting
(
String
config
)
{
if
(
LOGGER
.
isDebugEnable
())
{
LOGGER
.
debug
(
"Updating using new static config: {}"
,
config
);
}
try
{
this
.
samplingRate
.
set
(
Integer
.
parseInt
(
config
));
/*
* We need to notify samplingService the samplingRate changed.
*/
samplingService
.
handleSamplingRateChanged
();
}
catch
(
NumberFormatException
ex
)
{
LOGGER
.
error
(
ex
,
"Cannot load {} from: {}"
,
getPropertyKey
(),
config
);
}
}
@Override
public
void
notify
(
final
ConfigChangeEvent
value
)
{
if
(
EventType
.
DELETE
.
equals
(
value
.
getEventType
()))
{
activeSetting
(
String
.
valueOf
(
getDefaultValue
()));
}
else
{
activeSetting
(
value
.
getNewValue
());
}
}
@Override
public
String
value
()
{
return
String
.
valueOf
(
samplingRate
.
get
());
}
private
int
getDefaultValue
()
{
return
Config
.
Agent
.
SAMPLE_N_PER_3_SECS
;
}
public
int
getSamplingRate
()
{
return
samplingRate
.
get
();
}
}
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.java
浏览文件 @
db610152
...
...
@@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.skywalking.apm.agent.core.boot.BootService
;
import
org.apache.skywalking.apm.agent.core.boot.DefaultImplementor
;
import
org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory
;
import
org.apache.skywalking.apm.agent.core.boot.ServiceManager
;
import
org.apache.skywalking.apm.agent.core.conf.Config
;
import
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
;
import
org.apache.skywalking.apm.agent.core.context.trace.TraceSegment
;
import
org.apache.skywalking.apm.agent.core.logging.api.ILog
;
import
org.apache.skywalking.apm.agent.core.logging.api.LogManager
;
...
...
@@ -47,30 +49,19 @@ public class SamplingService implements BootService {
private
volatile
AtomicInteger
samplingFactorHolder
;
private
volatile
ScheduledFuture
<?>
scheduledFuture
;
private
SamplingRateWatcher
samplingRateWatcher
;
@Override
public
void
prepare
()
{
samplingRateWatcher
=
new
SamplingRateWatcher
(
"agent.sample_n_per_3_secs"
,
this
);
}
@Override
public
void
boot
()
{
if
(
scheduledFuture
!=
null
)
{
/*
* If {@link #boot()} invokes twice, mostly in test cases,
* cancel the old one.
*/
scheduledFuture
.
cancel
(
true
);
}
if
(
Config
.
Agent
.
SAMPLE_N_PER_3_SECS
>
0
)
{
on
=
true
;
this
.
resetSamplingFactor
();
ScheduledExecutorService
service
=
Executors
.
newSingleThreadScheduledExecutor
(
new
DefaultNamedThreadFactory
(
"SamplingService"
));
scheduledFuture
=
service
.
scheduleAtFixedRate
(
new
RunnableWithExceptionProtection
(
this
::
resetSamplingFactor
,
t
->
LOGGER
.
error
(
"unexpected exception."
,
t
)),
0
,
3
,
TimeUnit
.
SECONDS
);
LOGGER
.
debug
(
"Agent sampling mechanism started. Sample {} traces in 3 seconds."
,
Config
.
Agent
.
SAMPLE_N_PER_3_SECS
);
}
ServiceManager
.
INSTANCE
.
findService
(
ConfigurationDiscoveryService
.
class
)
.
registerAgentConfigChangeWatcher
(
samplingRateWatcher
);
handleSamplingRateChanged
();
}
@Override
...
...
@@ -92,7 +83,7 @@ public class SamplingService implements BootService {
public
boolean
trySampling
(
String
operationName
)
{
if
(
on
)
{
int
factor
=
samplingFactorHolder
.
get
();
if
(
factor
<
Config
.
Agent
.
SAMPLE_N_PER_3_SECS
)
{
if
(
factor
<
samplingRateWatcher
.
getSamplingRate
()
)
{
return
samplingFactorHolder
.
compareAndSet
(
factor
,
factor
+
1
);
}
else
{
return
false
;
...
...
@@ -114,4 +105,31 @@ public class SamplingService implements BootService {
private
void
resetSamplingFactor
()
{
samplingFactorHolder
=
new
AtomicInteger
(
0
);
}
}
/**
* Handle the samplingRate changed.
*/
void
handleSamplingRateChanged
()
{
if
(
samplingRateWatcher
.
getSamplingRate
()
>
0
)
{
if
(!
on
)
{
on
=
true
;
this
.
resetSamplingFactor
();
ScheduledExecutorService
service
=
Executors
.
newSingleThreadScheduledExecutor
(
new
DefaultNamedThreadFactory
(
"SamplingService"
));
scheduledFuture
=
service
.
scheduleAtFixedRate
(
new
RunnableWithExceptionProtection
(
this
::
resetSamplingFactor
,
t
->
LOGGER
.
error
(
"unexpected exception."
,
t
)),
0
,
3
,
TimeUnit
.
SECONDS
);
LOGGER
.
debug
(
"Agent sampling mechanism started. Sample {} traces in 3 seconds."
,
samplingRateWatcher
.
getSamplingRate
()
);
}
}
else
{
if
(
on
)
{
if
(
scheduledFuture
!=
null
)
{
scheduledFuture
.
cancel
(
true
);
}
on
=
false
;
}
}
}
}
\ No newline at end of file
apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
浏览文件 @
db610152
...
...
@@ -32,4 +32,5 @@ org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.context.status.StatusCheckService
org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient
\ No newline at end of file
org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
\ No newline at end of file
apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
浏览文件 @
db610152
...
...
@@ -58,7 +58,7 @@ public class ServiceManagerTest {
public
void
testServiceDependencies
()
throws
Exception
{
HashMap
<
Class
,
BootService
>
registryService
=
getFieldValue
(
ServiceManager
.
INSTANCE
,
"bootedServices"
);
assertThat
(
registryService
.
size
(),
is
(
1
7
));
assertThat
(
registryService
.
size
(),
is
(
1
8
));
assertTraceSegmentServiceClient
(
ServiceManager
.
INSTANCE
.
findService
(
TraceSegmentServiceClient
.
class
));
assertContextManager
(
ServiceManager
.
INSTANCE
.
findService
(
ContextManager
.
class
));
...
...
@@ -107,7 +107,7 @@ public class ServiceManagerTest {
assertNotNull
(
service
);
List
<
GRPCChannelListener
>
listeners
=
getFieldValue
(
service
,
"listeners"
);
assertEquals
(
listeners
.
size
(),
7
);
assertEquals
(
listeners
.
size
(),
8
);
}
private
void
assertSamplingService
(
SamplingService
service
)
{
...
...
apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/sampling/SamplingRateWatcherTest.java
0 → 100644
浏览文件 @
db610152
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.agent.core.sampling
;
import
org.apache.skywalking.apm.agent.core.conf.dynamic.AgentConfigChangeWatcher
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.powermock.reflect.Whitebox
;
public
class
SamplingRateWatcherTest
{
private
SamplingService
samplingService
=
new
SamplingService
();
@Before
public
void
setUp
()
{
samplingService
.
prepare
();
}
@Test
public
void
testConfigModifyEvent
()
{
SamplingRateWatcher
samplingRateWatcher
=
Whitebox
.
getInternalState
(
samplingService
,
"samplingRateWatcher"
);
samplingRateWatcher
.
notify
(
new
AgentConfigChangeWatcher
.
ConfigChangeEvent
(
"10"
,
AgentConfigChangeWatcher
.
EventType
.
MODIFY
));
Assert
.
assertEquals
(
10
,
samplingRateWatcher
.
getSamplingRate
());
Assert
.
assertEquals
(
"agent.sample_n_per_3_secs"
,
samplingRateWatcher
.
getPropertyKey
());
}
@Test
public
void
testConfigDeleteEvent
()
{
SamplingRateWatcher
samplingRateWatcher
=
Whitebox
.
getInternalState
(
samplingService
,
"samplingRateWatcher"
);
samplingRateWatcher
.
notify
(
new
AgentConfigChangeWatcher
.
ConfigChangeEvent
(
null
,
AgentConfigChangeWatcher
.
EventType
.
DELETE
));
Assert
.
assertEquals
(
"agent.sample_n_per_3_secs"
,
samplingRateWatcher
.
getPropertyKey
());
}
}
apm-sniffer/optional-plugins/trace-ignore-plugin/src/main/java/org/apache/skywalking/apm/plugin/trace/ignore/TraceIgnoreExtendService.java
浏览文件 @
db610152
...
...
@@ -47,6 +47,7 @@ public class TraceIgnoreExtendService extends SamplingService {
@Override
public
void
prepare
()
{
super
.
prepare
();
}
@Override
...
...
apm-sniffer/optional-plugins/trace-ignore-plugin/src/test/java/org/apache/skywalking/apm/plugin/trace/ignore/TraceIgnoreTest.java
浏览文件 @
db610152
...
...
@@ -29,6 +29,7 @@ import org.junit.Assert;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.contrib.java.lang.system.EnvironmentVariables
;
import
org.powermock.reflect.Whitebox
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
hamcrest
.
MatcherAssert
.
assertThat
;
...
...
@@ -51,8 +52,10 @@ public class TraceIgnoreTest {
@Test
public
void
testTraceIgnore
()
{
SamplingService
service
=
ServiceManager
.
INSTANCE
.
findService
(
SamplingService
.
class
);
IgnoreConfig
.
Trace
.
IGNORE_PATH
=
"/eureka/**"
;
service
.
boot
();
Whitebox
.
setInternalState
(
service
,
"patterns"
,
new
String
[]
{
"/eureka/**"
}
);
Assert
.
assertFalse
(
service
.
trySampling
(
"/eureka/apps"
));
Assert
.
assertTrue
(
service
.
trySampling
(
"/consul/apps"
));
...
...
docs/en/setup/service-agent/java-agent/README.md
浏览文件 @
db610152
...
...
@@ -94,6 +94,7 @@ property key | Description | Default |
`collector.backend_service`
|Collector SkyWalking trace receiver service addresses.|
`127.0.0.1:11800`
|
`collector.grpc_upstream_timeout`
|How long grpc client will timeout in sending data to upstream. Unit is second.|
`30`
seconds|
`collector.get_profile_task_interval`
|Sniffer get profile task list interval.|
`20`
|
`collector.get_agent_dynamic_config_interval`
|Sniffer get agent dynamic config interval|
`20`
|
`logging.level`
|Log level: TRACE, DEBUG, INFO, WARN, ERROR, OFF. Default is info.|
`INFO`
|
`logging.file_name`
|Log file name.|
`skywalking-api.log`
|
`logging.output`
| Log output. Default is FILE. Use CONSOLE means output to stdout. |
`FILE`
|
...
...
oap-server/server-bootstrap/src/main/resources/application.yml
浏览文件 @
db610152
...
...
@@ -415,6 +415,6 @@ health-checker:
checkIntervalSeconds
:
${SW_HEALTH_CHECKER_INTERVAL_SECONDS:5}
configuration-discovery
:
selector
:
${SW_CONFIGURATION_DISCOVERY:
-
}
selector
:
${SW_CONFIGURATION_DISCOVERY:
default
}
default
:
disableMessageDigest
:
${SW_DISABLE_MESSAGE_DIGEST:false}
oap-server/server-receiver-plugin/configuration-discovery-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/recevier/configuration/discovery/handler/grpc/ConfigurationDiscoveryServiceHandler.java
浏览文件 @
db610152
...
...
@@ -24,8 +24,8 @@ import java.util.List;
import
java.util.Objects
;
import
java.util.UUID
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.skywalking.apm.network.
agent.dynamic.configuration
.v3.ConfigurationDiscoveryServiceGrpc
;
import
org.apache.skywalking.apm.network.
agent.dynamic.configuration
.v3.ConfigurationSyncRequest
;
import
org.apache.skywalking.apm.network.
language.agent
.v3.ConfigurationDiscoveryServiceGrpc
;
import
org.apache.skywalking.apm.network.
language.agent
.v3.ConfigurationSyncRequest
;
import
org.apache.skywalking.apm.network.common.v3.Commands
;
import
org.apache.skywalking.apm.network.common.v3.KeyStringValuePair
;
import
org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录