未验证 提交 7b23a978 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

Implement correlation protocol (#35)

* implement correlation

* fix luaunit and add doc and example code

* Adding document and rockspec file

* resolve document issue

* remove empty value

* fix code style
Co-authored-by: NMrproliu <mrproliu@lagou.com>
上级 9c0f664f
......@@ -73,6 +73,7 @@ jobs:
lua span_test.lua
lua tracing_context_test.lua
lua segment_ref_test.lua
lua correlation_context_test.lua
cd ..
- name: 'Run Nginx Lua Tests'
run: |
......
......@@ -49,6 +49,8 @@ http {
-- Currently, we can not have the upstream real network address
------------------------------------------------------
require("tracer"):start("upstream service")
-- If you want correlation custom data to the downstream service
-- require("tracer"):start("upstream service", {custom = "custom_value"})
}
-- Target upstream service
......@@ -118,7 +120,7 @@ The following APIs are for developers or using this lib out of the Nginx case.
## Nginx APIs
- **startTimer**, `require("client"):startBackendTimer("http://127.0.0.1:8080")`. Start the backend timer. This timer register the metadata and report traces to the backend.
- **start**, `require("tracer"):start("upstream service")`. Begin the tracing before the upstream begin.
- **start**, `require("tracer"):start("upstream service", correlation)`. Begin the tracing before the upstream begin. The custom data (table type) can be injected as the second parameter, and then they will be propagated to the downstream service.
- **finish**, `require("tracer"):finish()`. Finish the tracing for this HTTP request.
- **prepareForReport**, `require("tracer"):prepareForReport()`. Prepare the finished segment for further report.
......
......@@ -59,6 +59,8 @@ http {
-- Currently, we can not have the upstream real network address
------------------------------------------------------
require("tracer"):start("upstream service")
-- If you want correlation custom data to the downstream service
-- require("tracer"):start("upstream service", {custom = "custom_value"})
}
proxy_pass http://127.0.0.1:8080/tier2/lb;
......
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-- limit define
local ELEMENT_MAX_NUMBER = 3
local VALUE_MAX_LENGTH = 128
local Util = require('util')
local Base64 = require('dependencies/base64')
local encode_base64 = Base64.encode
local decode_base64 = Base64.decode
if Util.is_ngx_lua then
encode_base64 = ngx.encode_base64
decode_base64 = ngx.decode_base64
end
local _M = {}
function _M.new()
return {}
end
-- Deserialze value from the correlation context and initalize the context
function _M.fromSW8Value(value)
local context = _M.new()
if value == nil or #value == 0 then
return context
end
local data = Util.split(value, ',')
if #data == 0 then
return context
end
for i, per_data in ipairs(data)
do
if #data > ELEMENT_MAX_NUMBER then
return context
end
local parts = Util.split(per_data, ':')
if #parts == 2 then
local key = decode_base64(parts[1])
local value = decode_base64(parts[2])
context[key] = value
end
end
return context
end
-- Return string to represent this correlation context
function _M.serialize(context)
local encoded = ''
for name, value in pairs(context) do
if #encoded > 0 then
encoded = encoded .. ','
end
encoded = encoded .. encode_base64(name) .. ':' .. encode_base64(value)
end
return encoded
end
-- Put the custom key/value into correlation context.
function _M.put(context, key, value)
-- key must not null
if not key then
return
end
-- remove and return previous value when value is empty
if not value or #value == 0 then
context[key] = nil
return
end
-- check value length
if #value > VALUE_MAX_LENGTH then
return
end
-- already contain key, overwrite it
if context[key] then
context[key] = value
return
end
-- check keys count
local contextLength = 0
for k,v in pairs(context) do
contextLength = contextLength + 1
end
if contextLength >= ELEMENT_MAX_NUMBER then
return
end
-- setting
context[key] = value
end
return _M
\ No newline at end of file
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local lu = require('luaunit')
local correlationContext = require('correlation_context')
local TC = require('tracing_context')
TestCorelationContext = {}
function TestCorelationContext:testFromSW8Value()
-- simple analyze
local context = correlationContext.fromSW8Value('dGVzdDE=:dDE=,dGVzdDI=:dDI=')
lu.assertNotNil(context)
lu.assertEquals(context["test1"], "t1")
lu.assertEquals(context["test2"], "t2")
-- analyze with empty value
context = correlationContext.fromSW8Value('dGVzdDE=:')
lu.assertNotNil(context)
lu.assertNil(context["test1"])
-- analyze with empty header
context = correlationContext.fromSW8Value('')
lu.assertNotNil(context)
lu.assertNotNil(#context == 0)
end
function TestCorelationContext:testSerialize()
-- serialize empty correlation
local context = correlationContext.fromSW8Value('')
local encode_context = correlationContext.serialize(context)
lu.assertNotNil(encode_context)
lu.assertEquals(encode_context, "")
-- serialize with multiple value
context = correlationContext.fromSW8Value('')
correlationContext.put(context, "test1", "t1")
correlationContext.put(context, "test2", "t2")
encode_context = correlationContext.serialize(context)
lu.assertNotNil(encode_context)
context = correlationContext.fromSW8Value(encode_context)
lu.assertNotNil(context)
lu.assertEquals(context["test1"], "t1")
lu.assertEquals(context["test2"], "t2")
-- serialize with empty value
context = correlationContext.fromSW8Value('')
correlationContext.put(context, "test1", "")
encode_context = correlationContext.serialize(context)
lu.assertNotNil(encode_context)
lu.assertEquals(encode_context, "")
end
function TestCorelationContext:testPut()
-- put with empty key and value
local context = correlationContext.fromSW8Value('')
correlationContext.put(context, nil, nil)
lu.assertEquals(correlationContext.serialize(context), '')
-- put nil to remove key
correlationContext.put(context, "test1", "t1")
correlationContext.put(context, "test1", nil)
lu.assertEquals(correlationContext.serialize(context), '')
-- overflow put
correlationContext.put(context, "test1", "t1")
correlationContext.put(context, "test2", "t2")
correlationContext.put(context, "test3", "t3")
correlationContext.put(context, "test4", "t4")
local encode_context = correlationContext.serialize(context)
lu.assertNotNil(encode_context)
local context = correlationContext.fromSW8Value(encode_context)
lu.assertEquals(context["test1"], "t1")
lu.assertEquals(context["test2"], "t2")
lu.assertEquals(context["test3"], "t3")
end
function TestCorelationContext:testTracingContext()
-- transform data
local context = TC.new("service", "instance")
local header = {}
header["sw8-correlation"] = 'dGVzdDI=:dDI=,dGVzdDE=:dDE=,dGVzdDM=:dDM='
TC.createEntrySpan(context, 'operation_name', nil, header)
lu.assertNotNil(context.correlation)
local contextCarrier = {}
TC.createExitSpan(context, 'operation_name', nil, 'peer', contextCarrier)
lu.assertNotNil(contextCarrier['sw8-correlation'])
local correlation = correlationContext.fromSW8Value(contextCarrier['sw8-correlation'])
lu.assertEquals(correlation["test1"], "t1")
lu.assertEquals(correlation["test2"], "t2")
-- transform data with adding data
TC.createExitSpan(context, 'operation_name', nil, 'peer', contextCarrier, {
test3 = "t3"
})
lu.assertNotNil(contextCarrier['sw8-correlation'])
correlation = correlationContext.fromSW8Value(contextCarrier['sw8-correlation'])
lu.assertEquals(correlation["test1"], "t1")
lu.assertEquals(correlation["test2"], "t2")
lu.assertEquals(correlation["test3"], "t3")
end
-- end TestTracingContext
os.exit( lu.LuaUnit.run() )
......@@ -18,7 +18,7 @@ local Span = require('span')
local Tracer = {}
function Tracer:start(upstream_name)
function Tracer:start(upstream_name, correlation)
local metadata_buffer = ngx.shared.tracing_buffer
local TC = require('tracing_context')
local Layer = require('span_layer')
......@@ -34,6 +34,7 @@ function Tracer:start(upstream_name)
local contextCarrier = {}
contextCarrier["sw8"] = ngx.req.get_headers()["sw8"]
contextCarrier["sw8-correlation"] = ngx.req.get_headers()["sw8-correlation"]
local entrySpan = TC.createEntrySpan(tracingContext, ngx.var.uri, nil, contextCarrier)
Span.start(entrySpan, ngx.now() * 1000)
Span.setComponentId(entrySpan, nginxComponentId)
......@@ -49,7 +50,7 @@ function Tracer:start(upstream_name)
local upstreamServerName = upstream_name
------------------------------------------------------
local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier)
local exitSpan = TC.createExitSpan(tracingContext, upstreamUri, entrySpan, upstreamServerName, contextCarrier, correlation)
Span.start(exitSpan, ngx.now() * 1000)
Span.setComponentId(exitSpan, nginxComponentId)
Span.setLayer(exitSpan, Layer.HTTP)
......
......@@ -17,6 +17,9 @@
local Util = require('util')
local Span = require('span')
local CorrelationContext = require('correlation_context')
local CONTEXT_CORRELATION_KEY = 'sw8-correlation'
-------------- Internal Object-------------
local Internal = {}
......@@ -126,16 +129,32 @@ function _M.createEntrySpan(tracingContext, operationName, parent, contextCarrie
return Span.newNoOP()
end
local correlationData = ''
if contextCarrier then
correlationData = contextCarrier[CONTEXT_CORRELATION_KEY]
end
tracingContext.correlation = CorrelationContext.fromSW8Value(correlationData)
return Span.createEntrySpan(operationName, tracingContext, parent, contextCarrier)
end
-- Delegate to Span.createExitSpan
-- @param contextCarrier could be nil if don't need to inject any context to propagate
function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier)
function _M.createExitSpan(tracingContext, operationName, parent, peer, contextCarrier, correlation)
if tracingContext.is_noop then
return Span.newNoOP()
end
if contextCarrier then
if correlation then
for name, value in pairs(correlation) do
CorrelationContext.put(tracingContext.correlation, name, value)
end
end
contextCarrier[CONTEXT_CORRELATION_KEY] = CorrelationContext.serialize(tracingContext.correlation)
end
return Span.createExitSpan(operationName, tracingContext, parent, peer, contextCarrier)
end
......
......@@ -26,5 +26,6 @@ build = {
["skywalking.span"] = "lib/skywalking/span.lua",
["skywalking.tracing_context"] = "lib/skywalking/tracing_context.lua",
["skywalking.util"] = "lib/skywalking/util.lua",
["skywalking.correlation_context"] = "lib/skywalking/correlation_context.lua",
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册