Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
月轩居士
SkyWalking
提交
a382f3f7
S
SkyWalking
项目概览
月轩居士
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
4
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a382f3f7
编写于
7月 08, 2017
作者:
A
ascrutae
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support agent core testcase
上级
1135d257
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
473 addition
and
22 deletion
+473
-22
apm-application-toolkit/apm-toolkit-opentracing/src/main/java/org/skywalking/apm/toolkit/opentracing/SkywalkingSpan.java
...rg/skywalking/apm/toolkit/opentracing/SkywalkingSpan.java
+2
-1
apm-sniffer/apm-agent-core/pom.xml
apm-sniffer/apm-agent-core/pom.xml
+18
-0
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/conf/Config.java
.../main/java/org/skywalking/apm/agent/core/conf/Config.java
+1
-0
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextManager.java
...org/skywalking/apm/agent/core/context/ContextManager.java
+0
-1
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
...ing/apm/agent/core/context/trace/AbstractTracingSpan.java
+2
-2
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/ExitSpan.java
...org/skywalking/apm/agent/core/context/trace/ExitSpan.java
+2
-0
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/CollectorDiscoveryService.java
...king/apm/agent/core/remote/CollectorDiscoveryService.java
+6
-3
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/DiscoveryRestServiceClient.java
...ing/apm/agent/core/remote/DiscoveryRestServiceClient.java
+10
-7
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java
.../skywalking/apm/agent/core/remote/GRPCChannelManager.java
+7
-2
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
...king/apm/agent/core/remote/TraceSegmentServiceClient.java
+2
-1
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/boot/ServiceManagerTest.java
...rg/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+3
-5
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/context/ContextManagerTest.java
...skywalking/apm/agent/core/context/ContextManagerTest.java
+81
-0
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/remote/DiscoveryRestServiceClientTest.java
...apm/agent/core/remote/DiscoveryRestServiceClientTest.java
+104
-0
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/remote/GRPCChannelManagerTest.java
...walking/apm/agent/core/remote/GRPCChannelManagerTest.java
+103
-0
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClientTest.java
.../apm/agent/core/remote/TraceSegmentServiceClientTest.java
+132
-0
未找到文件。
apm-application-toolkit/apm-toolkit-opentracing/src/main/java/org/skywalking/apm/toolkit/opentracing/SkywalkingSpan.java
浏览文件 @
a382f3f7
...
...
@@ -17,12 +17,13 @@ public class SkywalkingSpan implements Span {
/**
* Create a shell span for {@link SkywalkingTracer#activeSpan()}
*
* @param tracer
*/
@NeedSnifferActivation
(
"1. set the span reference to the dynamic field of enhanced SkywalkingSpan"
)
public
SkywalkingSpan
(
SkywalkingTracer
tracer
){
public
SkywalkingSpan
(
SkywalkingTracer
tracer
)
{
}
...
...
apm-sniffer/apm-agent-core/pom.xml
浏览文件 @
a382f3f7
...
...
@@ -77,6 +77,24 @@
<version>
${jetty.version}
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
com.github.tomakehurst
</groupId>
<artifactId>
wiremock
</artifactId>
<version>
2.6.0
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
io.grpc
</groupId>
<artifactId>
grpc-testing
</artifactId>
<version>
1.4.0
</version>
<exclusions>
<exclusion>
<artifactId>
mockito-core
</artifactId>
<groupId>
org.mockito
</groupId>
</exclusion>
</exclusions>
<scope>
test
</scope>
</dependency>
</dependencies>
<build>
<extensions>
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/conf/Config.java
浏览文件 @
a382f3f7
...
...
@@ -34,6 +34,7 @@ public class Config {
}
public
static
class
Collector
{
public
static
long
DISCOVERY_CHECK_INTERVAL
=
60
*
1000
;
/**
* Collector REST-Service address.
* e.g.
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextManager.java
浏览文件 @
a382f3f7
...
...
@@ -5,7 +5,6 @@ import org.skywalking.apm.agent.core.boot.ServiceManager;
import
org.skywalking.apm.agent.core.conf.Config
;
import
org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan
;
import
org.skywalking.apm.agent.core.context.trace.TraceSegment
;
import
org.skywalking.apm.agent.core.dictionary.DictionaryUtil
;
import
org.skywalking.apm.agent.core.sampling.SamplingService
;
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
浏览文件 @
a382f3f7
...
...
@@ -214,14 +214,14 @@ public abstract class AbstractTracingSpan implements AbstractSpan {
spanBuilder
.
setParentSpanId
(
parentSpanId
);
spanBuilder
.
setStartTime
(
startTime
);
spanBuilder
.
setEndTime
(
endTime
);
if
(
operationId
=
=
DictionaryUtil
.
nullValue
())
{
if
(
operationId
!
=
DictionaryUtil
.
nullValue
())
{
spanBuilder
.
setOperationNameId
(
operationId
);
}
else
{
spanBuilder
.
setOperationName
(
operationName
);
}
spanBuilder
.
setSpanType
(
SpanType
.
Entry
);
spanBuilder
.
setSpanLayerValue
(
this
.
layer
.
getCode
());
if
(
componentId
=
=
DictionaryUtil
.
nullValue
())
{
if
(
componentId
!
=
DictionaryUtil
.
nullValue
())
{
spanBuilder
.
setComponentId
(
componentId
);
}
else
{
spanBuilder
.
setComponent
(
componentName
);
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/trace/ExitSpan.java
浏览文件 @
a382f3f7
...
...
@@ -2,6 +2,7 @@ package org.skywalking.apm.agent.core.context.trace;
import
org.skywalking.apm.agent.core.dictionary.DictionaryUtil
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.SpanType
;
import
org.skywalking.apm.network.trace.component.Component
;
/**
...
...
@@ -113,6 +114,7 @@ public class ExitSpan extends AbstractTracingSpan {
}
else
{
spanBuilder
.
setPeer
(
peer
);
}
spanBuilder
=
spanBuilder
.
setSpanType
(
SpanType
.
Exit
);
return
spanBuilder
;
}
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/CollectorDiscoveryService.java
浏览文件 @
a382f3f7
package
org.skywalking.apm.agent.core.remote
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
org.skywalking.apm.agent.core.boot.BootService
;
import
org.skywalking.apm.agent.core.conf.Config
;
/**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
...
...
@@ -15,9 +18,9 @@ public class CollectorDiscoveryService implements BootService {
@Override
public
void
boot
()
throws
Throwable
{
Thread
collectorClientThread
=
new
Thread
(
new
DiscoveryRestServiceClient
(),
"collectorClientThread"
);
collectorClientThread
.
setDaemon
(
true
);
collectorClientThread
.
start
(
);
Executors
.
newSingleThreadScheduledExecutor
()
.
scheduleAtFixedRate
(
new
DiscoveryRestServiceClient
(),
0
,
Config
.
Collector
.
DISCOVERY_CHECK_INTERVAL
,
TimeUnit
.
MILLISECONDS
);
}
@Override
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/DiscoveryRestServiceClient.java
浏览文件 @
a382f3f7
...
...
@@ -31,22 +31,25 @@ public class DiscoveryRestServiceClient implements Runnable {
private
volatile
int
selectedServer
=
-
1
;
public
DiscoveryRestServiceClient
()
{
if
(
Config
.
Collector
.
SERVERS
==
null
||
Config
.
Collector
.
SERVERS
.
trim
().
length
()
==
0
)
{
logger
.
warn
(
"Collector server not configured."
);
return
;
}
serverList
=
Config
.
Collector
.
SERVERS
.
split
(
","
);
Random
r
=
new
Random
();
if
(
serverList
.
length
>
0
)
{
selectedServer
=
r
.
nextInt
(
serverList
.
length
);
}
}
@Override
public
void
run
()
{
while
(
true
)
{
try
{
try2Sleep
(
60
*
1000
);
findServerList
();
}
catch
(
Throwable
t
)
{
logger
.
error
(
t
,
"Find server list fail."
);
}
try
{
findServerList
();
}
catch
(
Throwable
t
)
{
logger
.
error
(
t
,
"Find server list fail."
);
}
}
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/GRPCChannelManager.java
浏览文件 @
a382f3f7
...
...
@@ -62,11 +62,16 @@ public class GRPCChannelManager implements BootService, Runnable {
.
maxInboundMessageSize
(
1024
*
1024
*
50
)
.
usePlaintext
(
true
);
managedChannel
=
channelBuilder
.
build
();
reconnect
=
false
;
notify
(
GRPCChannelStatus
.
CONNECTED
);
if
(!
managedChannel
.
isShutdown
()
&&
!
managedChannel
.
isTerminated
())
{
reconnect
=
false
;
notify
(
GRPCChannelStatus
.
CONNECTED
);
}
else
{
notify
(
GRPCChannelStatus
.
DISCONNECT
);
}
return
;
}
catch
(
Throwable
t
)
{
logger
.
error
(
t
,
"Create channel to {} fail."
,
server
);
notify
(
GRPCChannelStatus
.
DISCONNECT
);
}
}
...
...
apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
浏览文件 @
a382f3f7
...
...
@@ -26,6 +26,7 @@ import static org.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
*/
public
class
TraceSegmentServiceClient
implements
BootService
,
IConsumer
<
TraceSegment
>,
TracingContextListener
,
GRPCChannelListener
{
private
static
final
ILog
logger
=
LogManager
.
getLogger
(
TraceSegmentServiceClient
.
class
);
private
static
final
int
TIMEOUT
=
30
*
1000
;
private
volatile
DataCarrier
<
TraceSegment
>
carrier
;
private
volatile
TraceSegmentServiceGrpc
.
TraceSegmentServiceStub
serviceStub
;
...
...
@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
}
upstreamSegmentStreamObserver
.
onCompleted
();
status
.
wait4Finish
(
30
*
1000
);
status
.
wait4Finish
(
TIMEOUT
);
if
(
logger
.
isDebugEnable
())
{
logger
.
debug
(
"{} trace segments have been sent to collector."
,
data
.
size
());
...
...
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/boot/ServiceManagerTest.java
浏览文件 @
a382f3f7
...
...
@@ -26,7 +26,7 @@ public class ServiceManagerTest {
ServiceManager
.
INSTANCE
.
boot
();
HashMap
<
Class
,
BootService
>
registryService
=
getFieldValue
(
ServiceManager
.
INSTANCE
,
"bootedServices"
);
assertThat
(
registryService
.
size
(),
is
(
6
));
assertThat
(
registryService
.
size
(),
is
(
7
));
assertTraceSegmentServiceClient
(
ServiceManager
.
INSTANCE
.
findService
(
TraceSegmentServiceClient
.
class
));
assertContextManager
(
ServiceManager
.
INSTANCE
.
findService
(
ContextManager
.
class
));
...
...
@@ -48,7 +48,7 @@ public class ServiceManagerTest {
private
void
assertTracingContextListener
()
throws
Exception
{
List
<
TracingContextListener
>
LISTENERS
=
getFieldValue
(
TracingContext
.
ListenerManager
.
class
,
"LISTENERS"
);
assertThat
(
LISTENERS
.
size
(),
is
(
2
));
assertThat
(
LISTENERS
.
size
(),
is
(
3
));
assertThat
(
LISTENERS
.
contains
(
ServiceManager
.
INSTANCE
.
findService
(
ContextManager
.
class
)),
is
(
true
));
assertThat
(
LISTENERS
.
contains
(
ServiceManager
.
INSTANCE
.
findService
(
TraceSegmentServiceClient
.
class
)),
is
(
true
));
...
...
@@ -62,9 +62,7 @@ public class ServiceManagerTest {
assertNotNull
(
service
);
List
<
GRPCChannelListener
>
listeners
=
getFieldValue
(
service
,
"listeners"
);
assertEquals
(
listeners
.
size
(),
1
);
assertThat
(
listeners
.
get
(
0
),
is
((
GRPCChannelListener
)
ServiceManager
.
INSTANCE
.
findService
(
TraceSegmentServiceClient
.
class
)));
assertEquals
(
listeners
.
size
(),
3
);
}
private
void
assertSamplingService
(
SamplingService
service
)
{
...
...
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/context/ContextManagerTest.java
浏览文件 @
a382f3f7
package
org.skywalking.apm.agent.core.context
;
import
com.google.instrumentation.trace.Span
;
import
com.google.protobuf.InvalidProtocolBufferException
;
import
java.util.Date
;
import
java.util.List
;
import
org.junit.After
;
import
org.junit.Before
;
...
...
@@ -22,6 +25,13 @@ import org.skywalking.apm.agent.core.context.util.SegmentStoragePoint;
import
org.skywalking.apm.agent.core.context.util.TraceSegmentRefHelper
;
import
org.skywalking.apm.agent.core.context.util.TracingSegmentRunner
;
import
org.skywalking.apm.agent.core.dictionary.DictionaryUtil
;
import
org.skywalking.apm.network.proto.KeyWithStringValue
;
import
org.skywalking.apm.network.proto.LogMessage
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.SpanType
;
import
org.skywalking.apm.network.proto.TraceSegmentObject
;
import
org.skywalking.apm.network.proto.TraceSegmentReference
;
import
org.skywalking.apm.network.proto.UpstreamSegment
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
...
...
@@ -186,4 +196,75 @@ public class ContextManagerTest {
RemoteDownstreamConfig
.
Agent
.
APPLICATION_INSTANCE_ID
=
DictionaryUtil
.
nullValue
();
}
@Test
public
void
testTransform
()
throws
InvalidProtocolBufferException
{
ContextCarrier
contextCarrier
=
new
ContextCarrier
().
deserialize
(
"S.1499176688384.581928182.80935.69.1|3|1|#192.168.1.8 :18002|#/portal/|T.1499176688386.581928182.80935.69.2"
);
assertTrue
(
contextCarrier
.
isValid
());
AbstractSpan
firstEntrySpan
=
ContextManager
.
createEntrySpan
(
"/testFirstEntry"
,
contextCarrier
);
firstEntrySpan
.
setComponent
(
ComponentsDefine
.
TOMCAT
);
Tags
.
HTTP
.
METHOD
.
set
(
firstEntrySpan
,
"GET"
);
Tags
.
URL
.
set
(
firstEntrySpan
,
"127.0.0.1:8080"
);
SpanLayer
.
asHttp
(
firstEntrySpan
);
AbstractSpan
secondEntrySpan
=
ContextManager
.
createEntrySpan
(
"/testSecondEntry"
,
contextCarrier
);
secondEntrySpan
.
setComponent
(
ComponentsDefine
.
DUBBO
);
Tags
.
URL
.
set
(
firstEntrySpan
,
"dubbo://127.0.0.1:8080"
);
SpanLayer
.
asRPCFramework
(
secondEntrySpan
);
ContextCarrier
injectContextCarrier
=
new
ContextCarrier
();
AbstractSpan
exitSpan
=
ContextManager
.
createExitSpan
(
"/textExitSpan"
,
injectContextCarrier
,
"127.0.0.1:12800"
);
exitSpan
.
errorOccurred
();
exitSpan
.
log
(
new
RuntimeException
(
"exception"
));
exitSpan
.
setComponent
(
ComponentsDefine
.
HTTPCLIENT
);
SpanLayer
.
asHttp
(
exitSpan
);
ContextManager
.
stopSpan
();
ContextManager
.
stopSpan
();
ContextManager
.
stopSpan
();
TraceSegment
actualSegment
=
tracingData
.
getTraceSegments
().
get
(
0
);
UpstreamSegment
upstreamSegment
=
actualSegment
.
transform
();
assertThat
(
upstreamSegment
.
getGlobalTraceIdsCount
(),
is
(
1
));
TraceSegmentObject
traceSegmentObject
=
TraceSegmentObject
.
parseFrom
(
upstreamSegment
.
getSegment
());
TraceSegmentReference
reference
=
traceSegmentObject
.
getRefs
(
0
);
assertThat
(
reference
.
getEntryServiceName
(),
is
(
"/portal/"
));
assertThat
(
reference
.
getNetworkAddress
(),
is
(
"192.168.1.8 :18002"
));
assertThat
(
reference
.
getParentSpanId
(),
is
(
3
));
assertThat
(
traceSegmentObject
.
getApplicationId
(),
is
(
1
));
assertThat
(
traceSegmentObject
.
getRefsCount
(),
is
(
1
));
assertThat
(
traceSegmentObject
.
getSpansCount
(),
is
(
2
));
SpanObject
actualSpan
=
traceSegmentObject
.
getSpans
(
1
);
assertThat
(
actualSpan
.
getComponentId
(),
is
(
3
));
assertThat
(
actualSpan
.
getComponent
(),
is
(
""
));
assertThat
(
actualSpan
.
getOperationName
(),
is
(
"/testSecondEntry"
));
assertThat
(
actualSpan
.
getParentSpanId
(),
is
(-
1
));
assertThat
(
actualSpan
.
getSpanId
(),
is
(
0
));
assertThat
(
actualSpan
.
getSpanType
(),
is
(
SpanType
.
Entry
));
SpanObject
exitSpanObject
=
traceSegmentObject
.
getSpans
(
0
);
assertThat
(
exitSpanObject
.
getComponentId
(),
is
(
2
));
assertThat
(
exitSpanObject
.
getComponent
(),
is
(
""
));
assertThat
(
exitSpanObject
.
getSpanType
(),
is
(
SpanType
.
Exit
));
assertThat
(
exitSpanObject
.
getOperationName
(),
is
(
"/textExitSpan"
));
assertThat
(
exitSpanObject
.
getParentSpanId
(),
is
(
0
));
assertThat
(
exitSpanObject
.
getSpanId
(),
is
(
1
));
assertThat
(
exitSpanObject
.
getLogsCount
(),
is
(
1
));
LogMessage
logMessage
=
exitSpanObject
.
getLogs
(
0
);
assertThat
(
logMessage
.
getDataCount
(),
is
(
4
));
List
<
KeyWithStringValue
>
values
=
logMessage
.
getDataList
();
assertThat
(
values
.
get
(
0
).
getValue
(),
is
(
"error"
));
assertThat
(
values
.
get
(
1
).
getValue
(),
is
(
RuntimeException
.
class
.
getName
()));
assertThat
(
values
.
get
(
2
).
getValue
(),
is
(
"exception"
));
assertTrue
(
values
.
get
(
2
).
getValue
().
length
()
<=
4000
);
}
}
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/remote/DiscoveryRestServiceClientTest.java
0 → 100644
浏览文件 @
a382f3f7
package
org.skywalking.apm.agent.core.remote
;
import
com.github.tomakehurst.wiremock.junit.WireMockRule
;
import
java.io.IOException
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.skywalking.apm.agent.core.conf.Config
;
import
org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig
;
import
static
com
.
github
.
tomakehurst
.
wiremock
.
client
.
WireMock
.
aResponse
;
import
static
com
.
github
.
tomakehurst
.
wiremock
.
client
.
WireMock
.
get
;
import
static
com
.
github
.
tomakehurst
.
wiremock
.
client
.
WireMock
.
stubFor
;
import
static
com
.
github
.
tomakehurst
.
wiremock
.
client
.
WireMock
.
urlEqualTo
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
hamcrest
.
MatcherAssert
.
assertThat
;
public
class
DiscoveryRestServiceClientTest
{
private
DiscoveryRestServiceClient
client
;
@Rule
public
WireMockRule
wireMockRule
=
new
WireMockRule
(
8089
);
@Before
public
void
setUpBeforeClass
()
{
Config
.
Collector
.
DISCOVERY_CHECK_INTERVAL
=
1
;
stubFor
(
get
(
urlEqualTo
(
"/withoutResult"
))
.
willReturn
(
aResponse
()
.
withStatus
(
200
)
.
withHeader
(
"Content-Type"
,
"application/json"
)
.
withBody
(
"[]"
)));
stubFor
(
get
(
urlEqualTo
(
"/withResult"
))
.
willReturn
(
aResponse
()
.
withStatus
(
200
)
.
withHeader
(
"Content-Type"
,
"application/json"
)
.
withBody
(
"['127.0.0.1:8080','127.0.0.1:8090']"
)));
stubFor
(
get
(
urlEqualTo
(
"/withSameResult"
))
.
willReturn
(
aResponse
()
.
withStatus
(
200
)
.
withHeader
(
"Content-Type"
,
"application/json"
)
.
withBody
(
"['127.0.0.1:8090','127.0.0.1:8080']"
)));
stubFor
(
get
(
urlEqualTo
(
"/withDifferenceResult"
))
.
willReturn
(
aResponse
()
.
withStatus
(
200
)
.
withHeader
(
"Content-Type"
,
"application/json"
)
.
withBody
(
"['127.0.0.1:9090','127.0.0.1:18090']"
)));
stubFor
(
get
(
urlEqualTo
(
"/with404"
))
.
willReturn
(
aResponse
()
.
withStatus
(
400
)));
}
@Test
public
void
testWithoutCollectorServer
()
throws
RESTResponseStatusError
,
IOException
{
client
=
new
DiscoveryRestServiceClient
();
client
.
run
();
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
size
(),
is
(
0
));
}
@Test
public
void
testWithGRPCAddress
()
throws
RESTResponseStatusError
,
IOException
{
Config
.
Collector
.
SERVERS
=
"127.0.0.1:8089"
;
Config
.
Collector
.
DISCOVERY_SERVICE_NAME
=
"/withResult"
;
client
=
new
DiscoveryRestServiceClient
();
client
.
run
();
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
size
(),
is
(
2
));
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
contains
(
"127.0.0.1:8080"
),
is
(
true
));
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
contains
(
"127.0.0.1:8090"
),
is
(
true
));
}
@Test
public
void
testWithoutGRPCAddress
()
throws
RESTResponseStatusError
,
IOException
{
Config
.
Collector
.
SERVERS
=
"127.0.0.1:8089"
;
Config
.
Collector
.
DISCOVERY_SERVICE_NAME
=
"/withoutResult"
;
client
=
new
DiscoveryRestServiceClient
();
client
.
run
();
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
size
(),
is
(
0
));
}
@Test
public
void
testChangeGrpcAddress
()
throws
RESTResponseStatusError
,
IOException
{
Config
.
Collector
.
SERVERS
=
"127.0.0.1:8089"
;
Config
.
Collector
.
DISCOVERY_SERVICE_NAME
=
"/withResult"
;
client
=
new
DiscoveryRestServiceClient
();
client
.
run
();
Config
.
Collector
.
DISCOVERY_SERVICE_NAME
=
"/withDifferenceResult"
;
client
.
run
();
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
size
(),
is
(
2
));
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
contains
(
"127.0.0.1:9090"
),
is
(
true
));
assertThat
(
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
contains
(
"127.0.0.1:18090"
),
is
(
true
));
}
@After
public
void
tearDown
()
{
Config
.
Collector
.
SERVERS
=
""
;
Config
.
Collector
.
DISCOVERY_SERVICE_NAME
=
"/grpc/address"
;
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
.
clear
();
}
}
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/remote/GRPCChannelManagerTest.java
0 → 100644
浏览文件 @
a382f3f7
package
org.skywalking.apm.agent.core.remote
;
import
io.grpc.NameResolver
;
import
io.grpc.Status
;
import
io.grpc.StatusRuntimeException
;
import
io.grpc.internal.DnsNameResolverProvider
;
import
io.grpc.netty.NettyChannelBuilder
;
import
io.grpc.testing.GrpcServerRule
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.junit.Before
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.powermock.core.classloader.annotations.PrepareForTest
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.reflect.Whitebox
;
import
org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
hamcrest
.
MatcherAssert
.
assertThat
;
import
static
org
.
mockito
.
Matchers
.
any
;
import
static
org
.
mockito
.
Matchers
.
anyInt
;
import
static
org
.
mockito
.
Matchers
.
anyString
;
import
static
org
.
mockito
.
Mockito
.
doThrow
;
import
static
org
.
mockito
.
Mockito
.
times
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
mockStatic
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
when
;
@RunWith
(
PowerMockRunner
.
class
)
@PrepareForTest
({
GRPCChannelManager
.
class
,
NettyChannelBuilder
.
class
})
public
class
GRPCChannelManagerTest
{
@Rule
private
GrpcServerRule
grpcServerRule
=
new
GrpcServerRule
().
directExecutor
();
@Spy
private
GRPCChannelManager
grpcChannelManager
=
new
GRPCChannelManager
();
@Mock
private
NettyChannelBuilder
mock
;
@Spy
private
MockGRPCChannelListener
listener
=
new
MockGRPCChannelListener
();
@Before
public
void
setUp
()
throws
Throwable
{
List
<
String
>
grpcServers
=
new
ArrayList
<
String
>();
grpcServers
.
add
(
"127.0.0.1:2181"
);
RemoteDownstreamConfig
.
Collector
.
GRPC_SERVERS
=
grpcServers
;
Whitebox
.
setInternalState
(
grpcChannelManager
,
"retryCycle"
,
1
);
mockStatic
(
NettyChannelBuilder
.
class
);
when
(
NettyChannelBuilder
.
forAddress
(
anyString
(),
anyInt
())).
thenReturn
(
mock
);
when
(
mock
.
nameResolverFactory
(
any
(
NameResolver
.
Factory
.
class
))).
thenReturn
(
mock
);
when
(
mock
.
maxInboundMessageSize
(
anyInt
())).
thenReturn
(
mock
);
when
(
mock
.
usePlaintext
(
true
)).
thenReturn
(
mock
);
when
(
mock
.
build
()).
thenReturn
(
grpcServerRule
.
getChannel
());
grpcChannelManager
.
addChannelListener
(
listener
);
}
@Test
public
void
changeStatusToConnectedWithReportError
()
throws
Throwable
{
grpcChannelManager
.
reportError
(
new
StatusRuntimeException
(
Status
.
ABORTED
));
grpcChannelManager
.
run
();
verify
(
listener
,
times
(
1
)).
statusChanged
(
GRPCChannelStatus
.
CONNECTED
);
assertThat
(
listener
.
status
,
is
(
GRPCChannelStatus
.
CONNECTED
));
}
@Test
public
void
changeStatusToDisConnectedWithReportError
()
throws
Throwable
{
doThrow
(
new
RuntimeException
()).
when
(
mock
).
nameResolverFactory
(
any
(
NameResolver
.
Factory
.
class
));
grpcChannelManager
.
run
();
verify
(
listener
,
times
(
1
)).
statusChanged
(
GRPCChannelStatus
.
DISCONNECT
);
assertThat
(
listener
.
status
,
is
(
GRPCChannelStatus
.
DISCONNECT
));
}
@Test
public
void
reportErrorWithoutChangeStatus
()
throws
Throwable
{
grpcChannelManager
.
run
();
grpcChannelManager
.
reportError
(
new
RuntimeException
());
grpcChannelManager
.
run
();
verify
(
listener
,
times
(
1
)).
statusChanged
(
GRPCChannelStatus
.
CONNECTED
);
assertThat
(
listener
.
status
,
is
(
GRPCChannelStatus
.
CONNECTED
));
}
private
class
MockGRPCChannelListener
implements
GRPCChannelListener
{
private
GRPCChannelStatus
status
;
@Override
public
void
statusChanged
(
GRPCChannelStatus
status
)
{
this
.
status
=
status
;
}
}
}
apm-sniffer/apm-agent-core/src/test/java/org/skywalking/apm/agent/core/remote/TraceSegmentServiceClientTest.java
0 → 100644
浏览文件 @
a382f3f7
package
org.skywalking.apm.agent.core.remote
;
import
com.google.protobuf.InvalidProtocolBufferException
;
import
io.grpc.stub.StreamObserver
;
import
io.grpc.testing.GrpcServerRule
;
import
java.util.ArrayList
;
import
java.util.List
;
import
org.junit.Before
;
import
org.junit.BeforeClass
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.powermock.reflect.Whitebox
;
import
org.skywalking.apm.agent.core.boot.ServiceManager
;
import
org.skywalking.apm.agent.core.conf.RemoteDownstreamConfig
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.tag.Tags
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.context.trace.SpanLayer
;
import
org.skywalking.apm.agent.core.context.util.SegmentStorage
;
import
org.skywalking.apm.agent.core.context.util.SegmentStoragePoint
;
import
org.skywalking.apm.agent.core.context.util.TracingSegmentRunner
;
import
org.skywalking.apm.network.proto.Downstream
;
import
org.skywalking.apm.network.proto.SpanObject
;
import
org.skywalking.apm.network.proto.SpanType
;
import
org.skywalking.apm.network.proto.TraceSegmentObject
;
import
org.skywalking.apm.network.proto.TraceSegmentServiceGrpc
;
import
org.skywalking.apm.network.proto.UpstreamSegment
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
hamcrest
.
MatcherAssert
.
assertThat
;
import
static
org
.
mockito
.
Mockito
.
spy
;
@RunWith
(
TracingSegmentRunner
.
class
)
public
class
TraceSegmentServiceClientTest
{
@Rule
public
GrpcServerRule
grpcServerRule
=
new
GrpcServerRule
().
directExecutor
();
@SegmentStoragePoint
private
SegmentStorage
storage
;
private
TraceSegmentServiceClient
serviceClient
=
new
TraceSegmentServiceClient
();
private
List
<
UpstreamSegment
>
upstreamSegments
;
private
TraceSegmentServiceGrpc
.
TraceSegmentServiceImplBase
serviceImplBase
=
new
TraceSegmentServiceGrpc
.
TraceSegmentServiceImplBase
()
{
@Override
public
StreamObserver
<
UpstreamSegment
>
collect
(
final
StreamObserver
<
Downstream
>
responseObserver
)
{
return
new
StreamObserver
<
UpstreamSegment
>()
{
@Override
public
void
onNext
(
UpstreamSegment
value
)
{
upstreamSegments
.
add
(
value
);
}
@Override
public
void
onError
(
Throwable
t
)
{
}
@Override
public
void
onCompleted
()
{
responseObserver
.
onNext
(
Downstream
.
getDefaultInstance
());
responseObserver
.
onCompleted
();
}
};
}
};
@BeforeClass
public
static
void
setUpBeforeClass
()
{
RemoteDownstreamConfig
.
Agent
.
APPLICATION_ID
=
1
;
RemoteDownstreamConfig
.
Agent
.
APPLICATION_INSTANCE_ID
=
1
;
}
@Before
public
void
setUp
()
throws
Throwable
{
ServiceManager
.
INSTANCE
.
boot
();
Whitebox
.
setInternalState
(
ServiceManager
.
INSTANCE
.
findService
(
GRPCChannelManager
.
class
),
"reconnect"
,
false
);
spy
(
serviceClient
);
Whitebox
.
setInternalState
(
serviceClient
,
"serviceStub"
,
TraceSegmentServiceGrpc
.
newStub
(
grpcServerRule
.
getChannel
()));
Whitebox
.
setInternalState
(
serviceClient
,
"status"
,
GRPCChannelStatus
.
CONNECTED
);
upstreamSegments
=
new
ArrayList
<
UpstreamSegment
>();
}
@Test
public
void
testSendTraceSegmentWithoutException
()
throws
InvalidProtocolBufferException
{
grpcServerRule
.
getServiceRegistry
().
addService
(
serviceImplBase
);
AbstractSpan
firstEntrySpan
=
ContextManager
.
createEntrySpan
(
"/testFirstEntry"
,
null
);
firstEntrySpan
.
setComponent
(
ComponentsDefine
.
TOMCAT
);
Tags
.
HTTP
.
METHOD
.
set
(
firstEntrySpan
,
"GET"
);
Tags
.
URL
.
set
(
firstEntrySpan
,
"127.0.0.1:8080"
);
SpanLayer
.
asHttp
(
firstEntrySpan
);
ContextManager
.
stopSpan
();
serviceClient
.
consume
(
storage
.
getTraceSegments
());
assertThat
(
upstreamSegments
.
size
(),
is
(
1
));
UpstreamSegment
upstreamSegment
=
upstreamSegments
.
get
(
0
);
assertThat
(
upstreamSegment
.
getGlobalTraceIdsCount
(),
is
(
1
));
TraceSegmentObject
traceSegmentObject
=
TraceSegmentObject
.
parseFrom
(
upstreamSegment
.
getSegment
());
assertThat
(
traceSegmentObject
.
getRefsCount
(),
is
(
0
));
assertThat
(
traceSegmentObject
.
getSpansCount
(),
is
(
1
));
SpanObject
spanObject
=
traceSegmentObject
.
getSpans
(
0
);
assertThat
(
spanObject
.
getSpanType
(),
is
(
SpanType
.
Entry
));
assertThat
(
spanObject
.
getSpanId
(),
is
(
0
));
assertThat
(
spanObject
.
getParentSpanId
(),
is
(-
1
));
}
@Test
public
void
testSendTraceSegmentWithException
()
throws
InvalidProtocolBufferException
{
grpcServerRule
.
getServiceRegistry
().
addService
(
serviceImplBase
);
AbstractSpan
firstEntrySpan
=
ContextManager
.
createEntrySpan
(
"/testFirstEntry"
,
null
);
firstEntrySpan
.
setComponent
(
ComponentsDefine
.
TOMCAT
);
Tags
.
HTTP
.
METHOD
.
set
(
firstEntrySpan
,
"GET"
);
Tags
.
URL
.
set
(
firstEntrySpan
,
"127.0.0.1:8080"
);
SpanLayer
.
asHttp
(
firstEntrySpan
);
ContextManager
.
stopSpan
();
grpcServerRule
.
getServer
().
shutdownNow
();
serviceClient
.
consume
(
storage
.
getTraceSegments
());
assertThat
(
upstreamSegments
.
size
(),
is
(
0
));
boolean
reconnect
=
Whitebox
.
getInternalState
(
ServiceManager
.
INSTANCE
.
findService
(
GRPCChannelManager
.
class
),
"reconnect"
);
assertThat
(
reconnect
,
is
(
true
));
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录