packagepluginsimport("context""github.com/oceanbase/obagent/monitor/message")// Source Data sources, production datatypeSourceinterface{Start(outchan<-[]*message.Message)(errerror)Stop()}typeInputinterface{Collect(ctxcontext.Context)[]*message.Message}// Processor Process the datatypeProcessorinterface{Start(in<-chan[]*message.Message,outchan<-[]*message.Message)(errerror)Stop()}typeMapperinterface{Map(in*message.Message)*message.Message}// Sink Output the data in a specified mannertypeSinkinterface{Start(in<-chan[]*message.Message)errorStop()}typePipeFuncfunc(in<-chan[]*message.Message,outchan<-[]*message.Message)(errerror)