fluentd.conf 2.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

<source>
  @type monitor_agent
  bind 0.0.0.0
  port 24220
</source>

<source>
  @type tail
  path /tmp/skywalking-logs/*/e2e-service-provider.log
  read_from_head true
  tag skywalking_log
  refresh_interval 1
  <parse>
    @type regexp
    expression /^\[SW_CTX:\s*(?<SW_CTX>[^ ]*]*)\] \[(?<level>[^ ]*)\] (?<logtime>[^\]]*) \[(?<thread>[^ ]*)\] (?<logger>[^\]]*):(?<line>[^\]]*) - (?<body>[^\]]*)$/
  </parse>
</source>

<filter skywalking_log>
  @type parser
  key_name SW_CTX
  reserve_data true
  remove_key_name_field true
  <parse>
    @type regexp
    expression /^\[(?<service>[^\]]*),(?<serviceInstance>[^\]]*),(?<traceId>[^\]]*),(?<traceSegmentId>[^\]]*),(?<spanId>[^\]]*)\]$/
  </parse>
</filter>

<filter skywalking_log>
  @type record_transformer
  enable_ruby true
  <record>
    traceContext ${{"traceId" => record["traceId"], "traceSegmentId" => record["traceSegmentId"], "spanId" => record["spanId"]}}
    tags ${{"data" => [{"key" => "level", "value" => record["level"]}, {"key" => "thread", "value" => record["thread"]}, {"key" => "logger", "value" => record["logger"]}, {"key" => "agent", "value" => "fluentd"}]}}
    body ${{"text" => {"text" => record["body"]}}}
    timestamp ${DateTime.strptime(record["logtime"], '%Y-%m-%d %H:%M:%S.%L').strftime('%Q')}
  </record>
  remove_keys level,thread,logger,line,traceId,traceSegmentId,spanId,logtime
</filter>

<match skywalking_log>
  @type kafka2
  brokers broker-a:9092,broker-b:9092
  default_topic skywalking-logs-json
  <format>
    @type json
  </format>
</match>