未验证 提交 fcf44ecd 编写于 作者: G Gao Hongtao 提交者: GitHub

Polish segment codes (#2)

上级 a0b15b25
package go2sky
import "sync/atomic"
func newSegmentSpan(defaultSpan *defaultSpan, parentSpan Span) Span {
s := &segmentSpanImpl{
defaultSpan: *defaultSpan,
segmentContext: &segmentContext{},
}
if parentSpan == nil {
return newSegmentRoot(s)
}
if rootSpan, ok := parentSpan.(segmentSpan); ok {
if rootSpan.segmentRegister() {
s.segmentContext = rootSpan.context()
return s
}
return newSegmentRoot(s)
}
return newSegmentRoot(s)
}
type segmentSpan interface {
context() *segmentContext
segmentRegister() bool
}
type segmentSpanImpl struct {
defaultSpan
*segmentContext
}
func (s *segmentSpanImpl) context() *segmentContext {
return s.segmentContext
}
type segmentContext struct {
collect chan<- Span
refNum *int32
}
func (s *segmentSpanImpl) segmentRegister() bool {
for {
o := atomic.LoadInt32(s.refNum)
if o < 0 {
return false
}
if atomic.CompareAndSwapInt32(s.refNum, o, o+1) {
return true
}
}
}
func (s *segmentSpanImpl) End() {
go func() {
s.collect <- s
}()
}
type rootSegmentSpan struct {
*segmentSpanImpl
notify <-chan Span
segment []Span
doneCh chan int32
}
func (rs *rootSegmentSpan) End() {
go func() {
rs.doneCh <- atomic.SwapInt32(rs.refNum, -1)
}()
}
func newSegmentRoot(segmentSpan *segmentSpanImpl) *rootSegmentSpan {
s := &rootSegmentSpan{
segmentSpanImpl: segmentSpan,
}
var init int32
s.refNum = &init
ch := make(chan Span)
s.collect = ch
s.notify = ch
s.segment = make([]Span, 0, 10)
s.doneCh = make(chan int32)
go func() {
total := -1
defer close(ch)
defer close(s.doneCh)
for {
select {
case span := <-s.notify:
s.segment = append(s.segment, span)
case n := <-s.doneCh:
total = int(n)
}
if total == len(s.segment) {
break
}
}
s.tracer.reporter.Send(append(s.segment, s))
}()
return s
}
......@@ -69,14 +69,14 @@ func TestAsyncMultipleSegments(t *testing.T) {
reportWg.Wait()
reportWg.Add(2)
go func() {
oSpan, ctx, _ := tracer.CreateLocalSpan(ctx)
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
oSpan, subCtx, _ := tracer.CreateLocalSpan(ctx)
eSpan, _ := tracer.CreateExitSpan(subCtx, MockInjector)
eSpan.End()
oSpan.End()
}()
go func() {
oSpan, ctx, _ := tracer.CreateLocalSpan(ctx)
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
oSpan, subCtx, _ := tracer.CreateLocalSpan(ctx)
eSpan, _ := tracer.CreateExitSpan(subCtx, MockInjector)
eSpan.End()
oSpan.End()
}()
......@@ -116,7 +116,7 @@ func (r *MockReporter) Verify(mm ...int) error {
}
for i, m := range mm {
if m != len(r.Message[i]) {
return fmt.Errorf("span size mismatch. expected %d actual %d", len(mm), len(r.Message))
return fmt.Errorf("span size mismatch. expected %d actual %d", m, len(r.Message[i]))
}
}
return nil
......
......@@ -2,8 +2,6 @@ package go2sky
import (
"context"
"sync/atomic"
"github.com/tetratelabs/go2sky/propagation"
)
......@@ -36,27 +34,18 @@ func (t *Tracer) CreateEntrySpan(ctx context.Context, extractor propagation.Extr
}
// CreateLocalSpan creates and starts a span for local usage
func (t *Tracer) CreateLocalSpan(ctx context.Context, opts ...SpanOption) (Span, context.Context, error) {
root := true
if parentSpan, ok := ctx.Value(key).(Span); ok && parentSpan != nil {
func (t *Tracer) CreateLocalSpan(ctx context.Context, opts ...SpanOption) (s Span, c context.Context, err error) {
parentSpan, ok := ctx.Value(key).(Span)
if ok && parentSpan != nil {
opts = append(opts, WithParent(parentSpan.Context()))
if parentRootSpan, okk := parentSpan.(SegmentSpan); okk {
root = !parentRootSpan.SegmentRegister()
opts = append(opts, func(s *defaultSpan) {
s.segmentContext = parentRootSpan.SegmentContext()
})
}
}
s := &defaultSpan{
ds := &defaultSpan{
tracer: t,
root: root,
}
for _, opt := range opts {
opt(s)
}
if root {
s.createSegment()
opt(ds)
}
s = newSegmentSpan(ds, parentSpan)
return s, context.WithValue(ctx, key, s), nil
}
......@@ -80,85 +69,17 @@ type Span interface {
End()
}
// SegmentSpan interface as segment span specification
type SegmentSpan interface {
SegmentRegister() bool
SegmentContext() segmentContext
}
type defaultSpan struct {
propagation.ContextCarrier
segmentContext
root bool
notify <-chan Span
segment []Span
doneCh chan int32
tracer *Tracer
}
type segmentContext struct {
collect chan<- Span
refNum *int32
}
func (s *defaultSpan) Context() propagation.ContextCarrier {
return s.ContextCarrier
}
func (s *defaultSpan) SegmentRegister() bool {
for {
o := atomic.LoadInt32(s.refNum)
if o < 0 {
return false
}
if atomic.CompareAndSwapInt32(s.refNum, o, o+1) {
return true
}
}
}
func (s *defaultSpan) SegmentContext() segmentContext {
return s.segmentContext
}
func (s *defaultSpan) End() {
go func() {
if s.root {
s.doneCh <- atomic.SwapInt32(s.refNum, -1)
return
}
s.collect <- s
}()
}
func (s *defaultSpan) createSegment() {
var init int32
s.refNum = &init
ch := make(chan Span)
s.collect = ch
s.notify = ch
s.segment = make([]Span, 0, 10)
s.doneCh = make(chan int32)
go func() {
total := -1
defer close(ch)
defer close(s.doneCh)
for {
select {
case span, ok := <-s.notify:
if !ok {
return
}
s.segment = append(s.segment, span)
case n := <-s.doneCh:
total = int(n)
}
if total == len(s.segment) {
break
}
}
s.tracer.reporter.Send(append(s.segment, s))
}()
}
// SpanOption allows for functional options to adjust behaviour
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册