未验证 提交 12379661 编写于 作者: G Gao Hongtao 提交者: GitHub

Abstract Segment Implement (#1)

* build system

* segment implement

* circle ci
上级 6c6f7ca5
version: 2.1
workflows:
version: 2
api_verify:
jobs:
- verify
executors:
build:
docker:
- image: circleci/golang:1.11
working_directory: /go/src/github.com/tetratelabs/go2sky
jobs:
verify:
executor: build
steps:
- checkout
- run: make vet
- run: make test
.DEFAULT_GOAL := test
GO111MODULE:=on
.PHONY: test
test:
go test -v -race -cover ./...
.PHONY: lint
lint:
# Ignore grep's exit code since no match returns 1.
echo 'linting...' ; golint ./...
.PHONY: vet
vet:
go vet ./...
.PHONY: all
all: vet lint test
.PHONY: example
# GO2Sky
[![CircleCI](https://circleci.com/gh/tetratelabs/go2sky.svg?style=shield)](https://circleci.com/gh/tetratelabs/go2sky)
**GO2Sky** is an intrument SDK library, writen in Go, by following [Apache SkyWalking](https://github.com/apache/incubator-skywalking) tracing and metric formats.
......
package propagation
//CarrierItem is sub entity of propagation specification
type CarrierItem interface {
HeadKey() string
HeadValue() string
SetValue(t string)
IsValid() bool
}
type sw3CarrierItem struct {
}
func (s *sw3CarrierItem) HeadKey() string {
return "sw3"
}
func (s *sw3CarrierItem) HeadValue() string {
return ""
}
func (s *sw3CarrierItem) SetValue(t string) {
}
func (s *sw3CarrierItem) IsValid() bool {
return true
}
//NewSW3CarrierItem create a new SkyWalking v3 propagation protocol carrier object
func NewSW3CarrierItem() CarrierItem {
item := new(sw3CarrierItem)
return item
}
// ContextCarrier is a data carrier of tracing context,
// it holds a snapshot for across process propagation.
type ContextCarrier struct {
items []CarrierItem
}
// GetAllItems gets all data from ContextCarrier
func (c *ContextCarrier) GetAllItems() []CarrierItem {
return c.items
}
// NewContextCarrier create a new ContextCarrier object
func NewContextCarrier() *ContextCarrier {
carrier := ContextCarrier{items: []CarrierItem{
NewSW3CarrierItem(),
}}
return &carrier
}
// Extractor is a tool specification which define how to
// extract trace parent context from propagation context
type Extractor func() (ContextCarrier, error)
// Injector is a tool specification which define how to
// inject trace context into propagation context
type Injector func(carrier *ContextCarrier) error
package go2sky
import (
"context"
"fmt"
"sync"
"testing"
"github.com/tetratelabs/go2sky/propagation"
)
func TestSyncSegment(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
mr := MockReporter{
WaitGroup: wg,
}
tracer, _ := NewTracer(WithReporter(&mr))
ctx := context.Background()
span, ctx, _ := tracer.CreateEntrySpan(ctx, MockExtractor)
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
eSpan.End()
span.End()
wg.Wait()
if err := mr.Verify(2); err != nil {
t.Error(err)
}
}
func TestAsyncSingleSegment(t *testing.T) {
reportWg := &sync.WaitGroup{}
exitWg := &sync.WaitGroup{}
reportWg.Add(1)
exitWg.Add(2)
mr := MockReporter{
WaitGroup: reportWg,
}
tracer, _ := NewTracer(WithReporter(&mr))
ctx := context.Background()
span, ctx, _ := tracer.CreateEntrySpan(ctx, MockExtractor)
go func() {
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
eSpan.End()
exitWg.Done()
}()
go func() {
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
eSpan.End()
exitWg.Done()
}()
exitWg.Wait()
span.End()
reportWg.Wait()
if err := mr.Verify(3); err != nil {
t.Error(err)
}
}
func TestAsyncMultipleSegments(t *testing.T) {
reportWg := &sync.WaitGroup{}
reportWg.Add(1)
mr := MockReporter{
WaitGroup: reportWg,
}
tracer, _ := NewTracer(WithReporter(&mr))
ctx := context.Background()
span, ctx, _ := tracer.CreateEntrySpan(ctx, MockExtractor)
span.End()
reportWg.Wait()
reportWg.Add(2)
go func() {
oSpan, ctx, _ := tracer.CreateLocalSpan(ctx)
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
eSpan.End()
oSpan.End()
}()
go func() {
oSpan, ctx, _ := tracer.CreateLocalSpan(ctx)
eSpan, _ := tracer.CreateExitSpan(ctx, MockInjector)
eSpan.End()
oSpan.End()
}()
reportWg.Wait()
if err := mr.Verify(1, 2, 2); err != nil {
t.Error(err)
}
}
func MockExtractor() (c propagation.ContextCarrier, e error) {
return
}
func MockInjector(carrier *propagation.ContextCarrier) (e error) {
carrier.GetAllItems()
return
}
type Segment []Span
type MockReporter struct {
Message []Segment
*sync.WaitGroup
sync.Mutex
}
func (r *MockReporter) Send(spans []Span) {
r.Mutex.Lock()
defer r.Mutex.Unlock()
r.Message = append(r.Message, spans)
r.WaitGroup.Done()
}
func (r *MockReporter) Verify(mm ...int) error {
if len(mm) != len(r.Message) {
return fmt.Errorf("message size mismatch. expected %d actual %d", len(mm), len(r.Message))
}
for i, m := range mm {
if m != len(r.Message[i]) {
return fmt.Errorf("span size mismatch. expected %d actual %d", len(mm), len(r.Message))
}
}
return nil
}
func (r *MockReporter) Close() {
}
package go2sky
import "github.com/tetratelabs/go2sky/propagation"
// WithParent setup parent context from propagation
func WithParent(cc propagation.ContextCarrier) SpanOption {
return func(s *defaultSpan) {
s.ContextCarrier = cc
}
}
package go2sky
import (
"context"
"sync/atomic"
"github.com/tetratelabs/go2sky/propagation"
)
// Tracer is go2sky tracer implementation.
type Tracer struct {
serviceCode string
reporter Reporter
}
// TracerOption allows for functional options to adjust behaviour
// of a Tracer to be created by NewTracer
type TracerOption func(t *Tracer)
// NewTracer return a new go2sky Tracer
func NewTracer(opts ...TracerOption) (tracer *Tracer, err error) {
t := &Tracer{}
for _, opt := range opts {
opt(t)
}
return t, nil
}
// CreateEntrySpan creates and starts an entry span for incoming request
func (t *Tracer) CreateEntrySpan(ctx context.Context, extractor propagation.Extractor) (Span, context.Context, error) {
cc, err := extractor()
if err != nil {
return nil, nil, err
}
return t.CreateLocalSpan(ctx, WithParent(cc))
}
// CreateLocalSpan creates and starts a span for local usage
func (t *Tracer) CreateLocalSpan(ctx context.Context, opts ...SpanOption) (Span, context.Context, error) {
root := true
if parentSpan, ok := ctx.Value(key).(Span); ok && parentSpan != nil {
opts = append(opts, WithParent(parentSpan.Context()))
if parentRootSpan, okk := parentSpan.(SegmentSpan); okk {
root = !parentRootSpan.SegmentRegister()
opts = append(opts, func(s *defaultSpan) {
s.segmentContext = parentRootSpan.SegmentContext()
})
}
}
s := &defaultSpan{
tracer: t,
root: root,
}
for _, opt := range opts {
opt(s)
}
if root {
s.createSegment()
}
return s, context.WithValue(ctx, key, s), nil
}
// CreateExitSpan creates and starts an exit span for client
func (t *Tracer) CreateExitSpan(ctx context.Context, injector propagation.Injector) (Span, error) {
s, _, err := t.CreateLocalSpan(ctx)
if err != nil {
return nil, err
}
cc := s.Context()
err = injector(&cc)
if err != nil {
return nil, err
}
return s, nil
}
// Span interface as common span specification
type Span interface {
Context() propagation.ContextCarrier
End()
}
// SegmentSpan interface as segment span specification
type SegmentSpan interface {
SegmentRegister() bool
SegmentContext() segmentContext
}
type defaultSpan struct {
propagation.ContextCarrier
segmentContext
root bool
notify <-chan Span
segment []Span
doneCh chan int32
tracer *Tracer
}
type segmentContext struct {
collect chan<- Span
refNum *int32
}
func (s *defaultSpan) Context() propagation.ContextCarrier {
return s.ContextCarrier
}
func (s *defaultSpan) SegmentRegister() bool {
for {
o := atomic.LoadInt32(s.refNum)
if o < 0 {
return false
}
if atomic.CompareAndSwapInt32(s.refNum, o, o+1) {
return true
}
}
}
func (s *defaultSpan) SegmentContext() segmentContext {
return s.segmentContext
}
func (s *defaultSpan) End() {
go func() {
if s.root {
s.doneCh <- atomic.SwapInt32(s.refNum, -1)
return
}
s.collect <- s
}()
}
func (s *defaultSpan) createSegment() {
var init int32
s.refNum = &init
ch := make(chan Span)
s.collect = ch
s.notify = ch
s.segment = make([]Span, 0, 10)
s.doneCh = make(chan int32)
go func() {
total := -1
defer close(ch)
defer close(s.doneCh)
for {
select {
case span, ok := <-s.notify:
if !ok {
return
}
s.segment = append(s.segment, span)
case n := <-s.doneCh:
total = int(n)
}
if total == len(s.segment) {
break
}
}
s.tracer.reporter.Send(append(s.segment, s))
}()
}
// SpanOption allows for functional options to adjust behaviour
// of a Span to be created by CreateLocalSpan
type SpanOption func(s *defaultSpan)
type ctxKey struct{}
var key = ctxKey{}
//Reporter is a data transit specification
type Reporter interface {
Send(spans []Span)
Close()
}
package go2sky
// WithReporter setup report pipeline for tracer
func WithReporter(reporter Reporter) TracerOption{
return func(t *Tracer) {
t.reporter = reporter
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册