service.go 14.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fatedier 已提交
15 16 17
package server

import (
F
fatedier 已提交
18
	"bytes"
F
fatedier 已提交
19
	"context"
F
fatedier 已提交
20
	"crypto/tls"
F
fatedier 已提交
21
	"fmt"
F
fatedier 已提交
22
	"io/ioutil"
F
fatedier 已提交
23 24
	"net"
	"net/http"
25
	"sort"
Y
yuyulei 已提交
26
	"strconv"
F
fatedier 已提交
27 28 29
	"time"

	"github.com/fatedier/frp/assets"
F
fatedier 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43
	"github.com/fatedier/frp/pkg/auth"
	"github.com/fatedier/frp/pkg/config"
	modelmetrics "github.com/fatedier/frp/pkg/metrics"
	"github.com/fatedier/frp/pkg/msg"
	"github.com/fatedier/frp/pkg/nathole"
	plugin "github.com/fatedier/frp/pkg/plugin/server"
	"github.com/fatedier/frp/pkg/transport"
	"github.com/fatedier/frp/pkg/util/log"
	frpNet "github.com/fatedier/frp/pkg/util/net"
	"github.com/fatedier/frp/pkg/util/tcpmux"
	"github.com/fatedier/frp/pkg/util/util"
	"github.com/fatedier/frp/pkg/util/version"
	"github.com/fatedier/frp/pkg/util/vhost"
	"github.com/fatedier/frp/pkg/util/xlog"
F
fatedier 已提交
44
	"github.com/fatedier/frp/server/controller"
F
fatedier 已提交
45
	"github.com/fatedier/frp/server/group"
46
	"github.com/fatedier/frp/server/metrics"
F
fatedier 已提交
47
	"github.com/fatedier/frp/server/ports"
F
fatedier 已提交
48
	"github.com/fatedier/frp/server/proxy"
T
Tank 已提交
49
	"github.com/fatedier/frp/server/visitor"
50

F
fatedier 已提交
51
	"github.com/fatedier/golib/net/mux"
F
fatedier 已提交
52
	fmux "github.com/hashicorp/yamux"
F
fatedier 已提交
53 54
)

F
fatedier 已提交
55
const (
56 57
	connReadTimeout       time.Duration = 10 * time.Second
	vhostReadWriteTimeout time.Duration = 30 * time.Second
F
fatedier 已提交
58 59
)

60
// Server service
F
fatedier 已提交
61
type Service struct {
62
	// Dispatch connections to different handlers listen on same port
63 64
	muxer *mux.Mux

65
	// Accept connections from client
F
fatedier 已提交
66
	listener net.Listener
F
fatedier 已提交
67

68
	// Accept connections using kcp
F
fatedier 已提交
69
	kcpListener net.Listener
F
fatedier 已提交
70

F
FishFish 已提交
71
	// Accept connections using websocket
F
fatedier 已提交
72
	websocketListener net.Listener
F
FishFish 已提交
73

F
fatedier 已提交
74
	// Accept frp tls connections
F
fatedier 已提交
75
	tlsListener net.Listener
F
fatedier 已提交
76

F
fatedier 已提交
77 78 79 80
	// Manage all controllers
	ctlManager *ControlManager

	// Manage all proxies
F
fatedier 已提交
81
	pxyManager *proxy.Manager
F
fatedier 已提交
82

F
fatedier 已提交
83 84 85
	// Manage all plugins
	pluginManager *plugin.Manager

F
fatedier 已提交
86
	// HTTP vhost router
F
fatedier 已提交
87
	httpVhostRouter *vhost.Routers
F
fatedier 已提交
88

F
fatedier 已提交
89
	// All resource managers and controllers
F
fatedier 已提交
90 91
	rc *controller.ResourceController

92 93 94
	// Verifies authentication based on selected method
	authVerifier auth.Verifier

F
fatedier 已提交
95
	tlsConfig *tls.Config
96 97

	cfg config.ServerCommonConf
F
fatedier 已提交
98 99
}

100
func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
101 102 103 104 105 106 107 108
	tlsConfig, err := transport.NewServerTLSConfig(
		cfg.TLSCertFile,
		cfg.TLSKeyFile,
		cfg.TLSTrustedCaFile)
	if err != nil {
		return
	}

F
fatedier 已提交
109
	svr = &Service{
F
fatedier 已提交
110
		ctlManager:    NewControlManager(),
F
fatedier 已提交
111
		pxyManager:    proxy.NewManager(),
F
fatedier 已提交
112
		pluginManager: plugin.NewManager(),
F
fatedier 已提交
113
		rc: &controller.ResourceController{
F
fatedier 已提交
114 115 116
			VisitorManager: visitor.NewManager(),
			TCPPortManager: ports.NewManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
			UDPPortManager: ports.NewManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
F
fatedier 已提交
117
		},
F
fatedier 已提交
118 119
		httpVhostRouter: vhost.NewRouters(),
		authVerifier:    auth.NewAuthVerifier(cfg.ServerConfig),
120
		tlsConfig:       tlsConfig,
121
		cfg:             cfg,
F
fatedier 已提交
122
	}
F
fatedier 已提交
123

G
Guy Lewin 已提交
124
	// Create tcpmux httpconnect multiplexer.
F
fatedier 已提交
125
	if cfg.TCPMuxHTTPConnectPort > 0 {
G
Guy Lewin 已提交
126
		var l net.Listener
F
fatedier 已提交
127
		l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.TCPMuxHTTPConnectPort))
G
Guy Lewin 已提交
128 129 130 131 132
		if err != nil {
			err = fmt.Errorf("Create server listener error, %v", err)
			return
		}

F
fatedier 已提交
133
		svr.rc.TCPMuxHTTPConnectMuxer, err = tcpmux.NewHTTPConnectTCPMuxer(l, vhostReadWriteTimeout)
G
Guy Lewin 已提交
134 135 136 137
		if err != nil {
			err = fmt.Errorf("Create vhost tcpMuxer error, %v", err)
			return
		}
F
fatedier 已提交
138
		log.Info("tcpmux httpconnect multiplexer listen on %s:%d", cfg.ProxyBindAddr, cfg.TCPMuxHTTPConnectPort)
G
Guy Lewin 已提交
139 140
	}

F
fatedier 已提交
141
	// Init all plugins
142 143 144 145 146 147 148 149
	plugin_names := make([]string, 0, len(cfg.HTTPPlugins))
	for n := range cfg.HTTPPlugins {
		plugin_names = append(plugin_names, n)
	}
	sort.Strings(plugin_names)

	for _, name := range plugin_names {
		svr.pluginManager.Register(plugin.NewHTTPPluginOptions(cfg.HTTPPlugins[name]))
F
fatedier 已提交
150 151
		log.Info("plugin [%s] has been registered", name)
	}
152
	svr.rc.PluginManager = svr.pluginManager
F
fatedier 已提交
153

F
fatedier 已提交
154
	// Init group controller
F
fatedier 已提交
155
	svr.rc.TCPGroupCtl = group.NewTCPGroupCtl(svr.rc.TCPPortManager)
F
fatedier 已提交
156

F
fatedier 已提交
157 158 159
	// Init HTTP group controller
	svr.rc.HTTPGroupCtl = group.NewHTTPGroupController(svr.httpVhostRouter)

G
Guy Lewin 已提交
160
	// Init TCP mux group controller
F
fatedier 已提交
161
	svr.rc.TCPMuxGroupCtl = group.NewTCPMuxGroupCtl(svr.rc.TCPMuxHTTPConnectMuxer)
G
Guy Lewin 已提交
162

F
fatedier 已提交
163 164 165
	// Init 404 not found page
	vhost.NotFoundPagePath = cfg.Custom404Page

166 167 168 169 170
	var (
		httpMuxOn  bool
		httpsMuxOn bool
	)
	if cfg.BindAddr == cfg.ProxyBindAddr {
F
fatedier 已提交
171
		if cfg.BindPort == cfg.VhostHTTPPort {
172 173
			httpMuxOn = true
		}
F
fatedier 已提交
174
		if cfg.BindPort == cfg.VhostHTTPSPort {
175 176 177 178
			httpsMuxOn = true
		}
	}

F
fatedier 已提交
179
	// Listen for accepting connections from client.
Y
yuyulei 已提交
180 181
	address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.BindPort))
	ln, err := net.Listen("tcp", address)
F
fatedier 已提交
182 183 184 185
	if err != nil {
		err = fmt.Errorf("Create server listener error, %v", err)
		return
	}
F
FishFish 已提交
186

F
fix ci  
fatedier 已提交
187 188
	svr.muxer = mux.NewMux(ln)
	go svr.muxer.Serve()
F
FishFish 已提交
189 190
	ln = svr.muxer.DefaultListener()

F
fatedier 已提交
191
	svr.listener = ln
Y
yuyulei 已提交
192
	log.Info("frps tcp listen on %s", address)
F
fatedier 已提交
193 194

	// Listen for accepting connections from client using kcp protocol.
F
fatedier 已提交
195
	if cfg.KCPBindPort > 0 {
Y
yuyulei 已提交
196 197
		address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.KCPBindPort))
		svr.kcpListener, err = frpNet.ListenKcp(address)
F
fatedier 已提交
198
		if err != nil {
Y
yuyulei 已提交
199
			err = fmt.Errorf("Listen on kcp address udp %s error: %v", address, err)
F
fatedier 已提交
200 201
			return
		}
F
fatedier 已提交
202
		log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.KCPBindPort)
F
fatedier 已提交
203
	}
F
fatedier 已提交
204

F
fatedier 已提交
205
	// Listen for accepting connections from client using websocket protocol.
F
fatedier 已提交
206
	websocketPrefix := []byte("GET " + frpNet.FrpWebsocketPath)
F
fatedier 已提交
207 208 209 210 211
	websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
		return bytes.Equal(data, websocketPrefix)
	})
	svr.websocketListener = frpNet.NewWebsocketListener(websocketLn)

F
fatedier 已提交
212
	// Create http vhost muxer.
F
fatedier 已提交
213 214 215
	if cfg.VhostHTTPPort > 0 {
		rp := vhost.NewHTTPReverseProxy(vhost.HTTPReverseProxyOptions{
			ResponseHeaderTimeoutS: cfg.VhostHTTPTimeout,
F
fatedier 已提交
216
		}, svr.httpVhostRouter)
F
fatedier 已提交
217
		svr.rc.HTTPReverseProxy = rp
F
fatedier 已提交
218

Y
yuyulei 已提交
219
		address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.VhostHTTPPort))
F
fatedier 已提交
220 221 222
		server := &http.Server{
			Addr:    address,
			Handler: rp,
F
fatedier 已提交
223
		}
F
fatedier 已提交
224
		var l net.Listener
225
		if httpMuxOn {
F
fatedier 已提交
226
			l = svr.muxer.ListenHttp(1)
227 228 229 230 231 232
		} else {
			l, err = net.Listen("tcp", address)
			if err != nil {
				err = fmt.Errorf("Create vhost http listener error, %v", err)
				return
			}
F
fatedier 已提交
233
		}
F
fatedier 已提交
234
		go server.Serve(l)
F
fatedier 已提交
235
		log.Info("http service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHTTPPort)
F
fatedier 已提交
236 237 238
	}

	// Create https vhost muxer.
F
fatedier 已提交
239
	if cfg.VhostHTTPSPort > 0 {
240 241
		var l net.Listener
		if httpsMuxOn {
F
fatedier 已提交
242
			l = svr.muxer.ListenHttps(1)
243
		} else {
Y
yuyulei 已提交
244 245
			address := net.JoinHostPort(cfg.ProxyBindAddr, strconv.Itoa(cfg.VhostHTTPSPort))
			l, err = net.Listen("tcp", address)
246 247 248 249
			if err != nil {
				err = fmt.Errorf("Create server listener error, %v", err)
				return
			}
Y
yuyulei 已提交
250
			log.Info("https service listen on %s", address)
F
fatedier 已提交
251
		}
252

F
fatedier 已提交
253
		svr.rc.VhostHTTPSMuxer, err = vhost.NewHTTPSMuxer(l, vhostReadWriteTimeout)
F
fatedier 已提交
254 255 256 257
		if err != nil {
			err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
			return
		}
F
fatedier 已提交
258 259
	}

F
fatedier 已提交
260
	// frp tls listener
F
fatedier 已提交
261
	svr.tlsListener = svr.muxer.Listen(1, 1, func(data []byte) bool {
F
fatedier 已提交
262
		return int(data[0]) == frpNet.FRPTLSHeadByte
F
fatedier 已提交
263 264
	})

F
fatedier 已提交
265
	// Create nat hole controller.
F
fatedier 已提交
266 267
	if cfg.BindUDPPort > 0 {
		var nc *nathole.Controller
268
		address := net.JoinHostPort(cfg.BindAddr, strconv.Itoa(cfg.BindUDPPort))
Y
yuyulei 已提交
269
		nc, err = nathole.NewController(address)
F
fatedier 已提交
270 271 272 273
		if err != nil {
			err = fmt.Errorf("Create nat hole controller error, %v", err)
			return
		}
F
fatedier 已提交
274
		svr.rc.NatHoleController = nc
Y
yuyulei 已提交
275
		log.Info("nat hole udp service listen on %s", address)
F
fatedier 已提交
276 277
	}

F
fatedier 已提交
278
	var statsEnable bool
F
fatedier 已提交
279
	// Create dashboard web server.
F
fatedier 已提交
280
	if cfg.DashboardPort > 0 {
281 282 283 284 285 286 287
		// Init dashboard assets
		err = assets.Load(cfg.AssetsDir)
		if err != nil {
			err = fmt.Errorf("Load assets error: %v", err)
			return
		}

Y
yuyulei 已提交
288 289
		address := net.JoinHostPort(cfg.DashboardAddr, strconv.Itoa(cfg.DashboardPort))
		err = svr.RunDashboardServer(address)
F
fatedier 已提交
290 291 292 293
		if err != nil {
			err = fmt.Errorf("Create dashboard web server error, %v", err)
			return
		}
T
timerever 已提交
294
		log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
F
fatedier 已提交
295
		statsEnable = true
F
fatedier 已提交
296
	}
297 298 299 300 301 302
	if statsEnable {
		modelmetrics.EnableMem()
		if cfg.EnablePrometheus {
			modelmetrics.EnablePrometheus()
		}
	}
F
fatedier 已提交
303 304 305 306
	return
}

func (svr *Service) Run() {
F
fatedier 已提交
307 308
	if svr.rc.NatHoleController != nil {
		go svr.rc.NatHoleController.Run()
F
fatedier 已提交
309
	}
F
fatedier 已提交
310
	if svr.cfg.KCPBindPort > 0 {
F
fatedier 已提交
311 312 313
		go svr.HandleListener(svr.kcpListener)
	}

F
fatedier 已提交
314
	go svr.HandleListener(svr.websocketListener)
F
fatedier 已提交
315
	go svr.HandleListener(svr.tlsListener)
F
fatedier 已提交
316

F
fatedier 已提交
317 318 319
	svr.HandleListener(svr.listener)
}

320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
func (svr *Service) handleConnection(ctx context.Context, conn net.Conn) {
	xl := xlog.FromContextSafe(ctx)

	var (
		rawMsg msg.Message
		err    error
	)

	conn.SetReadDeadline(time.Now().Add(connReadTimeout))
	if rawMsg, err = msg.ReadMsg(conn); err != nil {
		log.Trace("Failed to read message: %v", err)
		conn.Close()
		return
	}
	conn.SetReadDeadline(time.Time{})

	switch m := rawMsg.(type) {
	case *msg.Login:
		// server plugin hook
		content := &plugin.LoginContent{
			Login: *m,
		}
		retContent, err := svr.pluginManager.Login(content)
		if err == nil {
			m = &retContent.Login
			err = svr.RegisterControl(conn, m)
		}

		// If login failed, send error message there.
		// Otherwise send success message in control's work goroutine.
		if err != nil {
			xl.Warn("register control error: %v", err)
			msg.WriteMsg(conn, &msg.LoginResp{
				Version: version.Full(),
				Error:   util.GenerateResponseErrorString("register control error", err, svr.cfg.DetailedErrorsToClient),
			})
			conn.Close()
		}
	case *msg.NewWorkConn:
		if err := svr.RegisterWorkConn(conn, m); err != nil {
			conn.Close()
		}
	case *msg.NewVisitorConn:
		if err = svr.RegisterVisitorConn(conn, m); err != nil {
			xl.Warn("register visitor conn error: %v", err)
			msg.WriteMsg(conn, &msg.NewVisitorConnResp{
				ProxyName: m.ProxyName,
				Error:     util.GenerateResponseErrorString("register visitor conn error", err, svr.cfg.DetailedErrorsToClient),
			})
			conn.Close()
		} else {
			msg.WriteMsg(conn, &msg.NewVisitorConnResp{
				ProxyName: m.ProxyName,
				Error:     "",
			})
		}
	default:
		log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String())
		conn.Close()
	}
}

F
fatedier 已提交
382
func (svr *Service) HandleListener(l net.Listener) {
F
fatedier 已提交
383 384
	// Listen for incoming connections from client.
	for {
F
fatedier 已提交
385
		c, err := l.Accept()
F
fatedier 已提交
386 387 388 389
		if err != nil {
			log.Warn("Listener for incoming connections from client closed")
			return
		}
F
fatedier 已提交
390 391
		// inject xlog object into net.Conn context
		xl := xlog.New()
392 393
		ctx := context.Background()

F
fatedier 已提交
394
		c = frpNet.NewContextConn(xlog.NewContext(ctx, xl), c)
395 396 397

		log.Trace("start check TLS connection...")
		originConn := c
F
fatedier 已提交
398
		c, err = frpNet.CheckAndEnableTLSServerConnWithTimeout(c, svr.tlsConfig, svr.cfg.TLSOnly, connReadTimeout)
399 400 401 402 403 404
		if err != nil {
			log.Warn("CheckAndEnableTLSServerConnWithTimeout error: %v", err)
			originConn.Close()
			continue
		}
		log.Trace("success check TLS connection")
F
fatedier 已提交
405 406

		// Start a new goroutine for dealing connections.
407
		go func(ctx context.Context, frpConn net.Conn) {
F
fatedier 已提交
408
			if svr.cfg.TCPMux {
F
fatedier 已提交
409
				fmuxCfg := fmux.DefaultConfig()
F
fatedier 已提交
410
				fmuxCfg.KeepAliveInterval = 20 * time.Second
F
fatedier 已提交
411 412
				fmuxCfg.LogOutput = ioutil.Discard
				session, err := fmux.Server(frpConn, fmuxCfg)
F
fatedier 已提交
413
				if err != nil {
414
					log.Warn("Failed to create mux connection: %v", err)
F
fatedier 已提交
415
					frpConn.Close()
416 417 418 419 420 421
					return
				}

				for {
					stream, err := session.AcceptStream()
					if err != nil {
422
						log.Debug("Accept new mux stream error: %v", err)
F
fatedier 已提交
423
						session.Close()
424 425
						return
					}
426
					go svr.handleConnection(ctx, stream)
F
fatedier 已提交
427
				}
428
			} else {
429
				svr.handleConnection(ctx, frpConn)
F
fatedier 已提交
430
			}
431
		}(ctx, c)
F
fatedier 已提交
432 433 434
	}
}

F
fatedier 已提交
435
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err error) {
F
fatedier 已提交
436
	// If client's RunID is empty, it's a new client, we just create a new controller.
F
fatedier 已提交
437
	// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
F
fatedier 已提交
438 439
	if loginMsg.RunID == "" {
		loginMsg.RunID, err = util.RandID()
F
fatedier 已提交
440 441 442 443 444 445 446
		if err != nil {
			return
		}
	}

	ctx := frpNet.NewContextFromConn(ctlConn)
	xl := xlog.FromContextSafe(ctx)
F
fatedier 已提交
447
	xl.AppendPrefix(loginMsg.RunID)
F
fatedier 已提交
448 449
	ctx = xlog.NewContext(ctx, xl)
	xl.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]",
F
fatedier 已提交
450 451 452 453 454 455 456 457 458
		ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)

	// Check client version.
	if ok, msg := version.Compat(loginMsg.Version); !ok {
		err = fmt.Errorf("%s", msg)
		return
	}

	// Check auth.
459
	if err = svr.authVerifier.VerifyLogin(loginMsg); err != nil {
F
fatedier 已提交
460 461 462
		return
	}

463
	ctl := NewControl(ctx, svr.rc, svr.pxyManager, svr.pluginManager, svr.authVerifier, ctlConn, loginMsg, svr.cfg)
F
fatedier 已提交
464
	if oldCtl := svr.ctlManager.Add(loginMsg.RunID, ctl); oldCtl != nil {
F
fatedier 已提交
465
		oldCtl.allShutdown.WaitDone()
F
fatedier 已提交
466 467 468
	}

	ctl.Start()
469 470

	// for statistics
471
	metrics.Server.NewClient()
F
fatedier 已提交
472 473 474 475

	go func() {
		// block until control closed
		ctl.WaitClosed()
F
fatedier 已提交
476
		svr.ctlManager.Del(loginMsg.RunID, ctl)
F
fatedier 已提交
477
	}()
F
fatedier 已提交
478 479 480 481
	return
}

// RegisterWorkConn register a new work connection to control and proxies need it.
482
func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) error {
F
fatedier 已提交
483
	xl := frpNet.NewLogFromConn(workConn)
F
fatedier 已提交
484
	ctl, exist := svr.ctlManager.GetByID(newMsg.RunID)
F
fatedier 已提交
485
	if !exist {
F
fatedier 已提交
486 487
		xl.Warn("No client control found for run id [%s]", newMsg.RunID)
		return fmt.Errorf("no client control found for run id [%s]", newMsg.RunID)
F
fatedier 已提交
488
	}
489 490 491 492 493
	// server plugin hook
	content := &plugin.NewWorkConnContent{
		User: plugin.UserInfo{
			User:  ctl.loginMsg.User,
			Metas: ctl.loginMsg.Metas,
F
fatedier 已提交
494
			RunID: ctl.loginMsg.RunID,
495 496 497 498 499 500 501 502 503 504
		},
		NewWorkConn: *newMsg,
	}
	retContent, err := svr.pluginManager.NewWorkConn(content)
	if err == nil {
		newMsg = &retContent.NewWorkConn
		// Check auth.
		err = svr.authVerifier.VerifyNewWorkConn(newMsg)
	}
	if err != nil {
F
fatedier 已提交
505
		xl.Warn("invalid NewWorkConn with run id [%s]", newMsg.RunID)
506
		msg.WriteMsg(workConn, &msg.StartWorkConn{
507
			Error: util.GenerateResponseErrorString("invalid NewWorkConn", err, ctl.serverCfg.DetailedErrorsToClient),
508
		})
F
fatedier 已提交
509
		return fmt.Errorf("invalid NewWorkConn with run id [%s]", newMsg.RunID)
510 511
	}
	return ctl.RegisterWorkConn(workConn)
F
fatedier 已提交
512 513
}

F
fatedier 已提交
514
func (svr *Service) RegisterVisitorConn(visitorConn net.Conn, newMsg *msg.NewVisitorConn) error {
F
fatedier 已提交
515
	return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
F
fatedier 已提交
516 517
		newMsg.UseEncryption, newMsg.UseCompression)
}