service.go 10.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fatedier 已提交
15 16 17
package server

import (
F
fatedier 已提交
18
	"bytes"
F
fatedier 已提交
19
	"fmt"
F
fatedier 已提交
20
	"io/ioutil"
F
fatedier 已提交
21 22
	"net"
	"net/http"
F
fatedier 已提交
23 24 25
	"time"

	"github.com/fatedier/frp/assets"
F
fatedier 已提交
26
	"github.com/fatedier/frp/g"
F
fatedier 已提交
27
	"github.com/fatedier/frp/models/msg"
F
fatedier 已提交
28 29
	"github.com/fatedier/frp/server/group"
	"github.com/fatedier/frp/server/ports"
F
fatedier 已提交
30
	"github.com/fatedier/frp/utils/log"
31
	frpNet "github.com/fatedier/frp/utils/net"
F
fatedier 已提交
32 33 34
	"github.com/fatedier/frp/utils/util"
	"github.com/fatedier/frp/utils/version"
	"github.com/fatedier/frp/utils/vhost"
35

F
fatedier 已提交
36
	"github.com/fatedier/golib/net/mux"
F
fatedier 已提交
37
	fmux "github.com/hashicorp/yamux"
F
fatedier 已提交
38 39
)

F
fatedier 已提交
40 41 42 43
const (
	connReadTimeout time.Duration = 10 * time.Second
)

44 45
var ServerService *Service

F
fatedier 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
// All resource managers and controllers
type ResourceController struct {
	// Manage all controllers
	CtlManager *ControlManager

	// Manage all proxies
	PxyManager *ProxyManager

	// Manage all visitor listeners
	VisitorManager *VisitorManager

	// Tcp Group Controller
	TcpGroupCtl *group.TcpGroupCtl

	// Manage all tcp ports
	TcpPortManager *ports.PortManager

	// Manage all udp ports
	UdpPortManager *ports.PortManager

	// For http proxies, forwarding http requests
	HttpReverseProxy *vhost.HttpReverseProxy

	// For https proxies, route requests to different clients by hostname and other infomation
	VhostHttpsMuxer *vhost.HttpsMuxer

	// Controller for nat hole connections
	NatHoleController *NatHoleController
}

76
// Server service
F
fatedier 已提交
77
type Service struct {
78
	// Dispatch connections to different handlers listen on same port
79 80
	muxer *mux.Mux

81
	// Accept connections from client
82
	listener frpNet.Listener
F
fatedier 已提交
83

84
	// Accept connections using kcp
F
fatedier 已提交
85 86
	kcpListener frpNet.Listener

F
FishFish 已提交
87 88 89
	// Accept connections using websocket
	websocketListener frpNet.Listener

F
fatedier 已提交
90 91
	// All resource managers and controllers
	rc *ResourceController
F
fatedier 已提交
92 93 94
}

func NewService() (svr *Service, err error) {
F
fatedier 已提交
95
	cfg := &g.GlbServerCfg.ServerCommonConf
F
fatedier 已提交
96
	svr = &Service{
F
fatedier 已提交
97 98 99 100 101 102 103
		rc: &ResourceController{
			CtlManager:     NewControlManager(),
			PxyManager:     NewProxyManager(),
			VisitorManager: NewVisitorManager(),
			TcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
			UdpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
		},
F
fatedier 已提交
104
	}
F
fatedier 已提交
105
	svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager)
F
fatedier 已提交
106 107

	// Init assets.
F
fatedier 已提交
108
	err = assets.Load(cfg.AssetsDir)
F
fatedier 已提交
109 110 111 112 113
	if err != nil {
		err = fmt.Errorf("Load assets error: %v", err)
		return
	}

114 115 116 117 118 119 120 121 122 123 124 125 126
	var (
		httpMuxOn  bool
		httpsMuxOn bool
	)
	if cfg.BindAddr == cfg.ProxyBindAddr {
		if cfg.BindPort == cfg.VhostHttpPort {
			httpMuxOn = true
		}
		if cfg.BindPort == cfg.VhostHttpsPort {
			httpsMuxOn = true
		}
	}

F
fatedier 已提交
127
	// Listen for accepting connections from client.
128
	ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindPort))
F
fatedier 已提交
129 130 131 132
	if err != nil {
		err = fmt.Errorf("Create server listener error, %v", err)
		return
	}
F
FishFish 已提交
133

F
fix ci  
fatedier 已提交
134 135
	svr.muxer = mux.NewMux(ln)
	go svr.muxer.Serve()
F
FishFish 已提交
136 137
	ln = svr.muxer.DefaultListener()

138
	svr.listener = frpNet.WrapLogListener(ln)
F
fatedier 已提交
139
	log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort)
F
fatedier 已提交
140 141

	// Listen for accepting connections from client using kcp protocol.
F
fatedier 已提交
142 143
	if cfg.KcpBindPort > 0 {
		svr.kcpListener, err = frpNet.ListenKcp(cfg.BindAddr, cfg.KcpBindPort)
F
fatedier 已提交
144
		if err != nil {
F
fatedier 已提交
145
			err = fmt.Errorf("Listen on kcp address udp [%s:%d] error: %v", cfg.BindAddr, cfg.KcpBindPort, err)
F
fatedier 已提交
146 147
			return
		}
F
fatedier 已提交
148
		log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.KcpBindPort)
F
fatedier 已提交
149
	}
F
fatedier 已提交
150

F
fatedier 已提交
151
	// Listen for accepting connections from client using websocket protocol.
F
fatedier 已提交
152
	websocketPrefix := []byte("GET " + frpNet.FrpWebsocketPath)
F
fatedier 已提交
153 154 155 156 157
	websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
		return bytes.Equal(data, websocketPrefix)
	})
	svr.websocketListener = frpNet.NewWebsocketListener(websocketLn)

F
fatedier 已提交
158
	// Create http vhost muxer.
F
fatedier 已提交
159
	if cfg.VhostHttpPort > 0 {
F
fatedier 已提交
160 161 162
		rp := vhost.NewHttpReverseProxy(vhost.HttpReverseProxyOptions{
			ResponseHeaderTimeoutS: cfg.VhostHttpTimeout,
		})
F
fatedier 已提交
163
		svr.rc.HttpReverseProxy = rp
F
fatedier 已提交
164 165 166 167 168

		address := fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
		server := &http.Server{
			Addr:    address,
			Handler: rp,
F
fatedier 已提交
169
		}
F
fatedier 已提交
170
		var l net.Listener
171
		if httpMuxOn {
F
fatedier 已提交
172
			l = svr.muxer.ListenHttp(1)
173 174 175 176 177 178
		} else {
			l, err = net.Listen("tcp", address)
			if err != nil {
				err = fmt.Errorf("Create vhost http listener error, %v", err)
				return
			}
F
fatedier 已提交
179
		}
F
fatedier 已提交
180
		go server.Serve(l)
F
fatedier 已提交
181
		log.Info("http service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
F
fatedier 已提交
182 183 184
	}

	// Create https vhost muxer.
F
fatedier 已提交
185
	if cfg.VhostHttpsPort > 0 {
186 187
		var l net.Listener
		if httpsMuxOn {
F
fatedier 已提交
188
			l = svr.muxer.ListenHttps(1)
189 190 191 192 193 194
		} else {
			l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort))
			if err != nil {
				err = fmt.Errorf("Create server listener error, %v", err)
				return
			}
F
fatedier 已提交
195
		}
196

F
fatedier 已提交
197
		svr.rc.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second)
F
fatedier 已提交
198 199 200 201
		if err != nil {
			err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
			return
		}
F
fatedier 已提交
202 203 204 205 206 207 208 209 210 211 212 213
		log.Info("https service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort)
	}

	// Create nat hole controller.
	if cfg.BindUdpPort > 0 {
		var nc *NatHoleController
		addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort)
		nc, err = NewNatHoleController(addr)
		if err != nil {
			err = fmt.Errorf("Create nat hole controller error, %v", err)
			return
		}
F
fatedier 已提交
214
		svr.rc.NatHoleController = nc
F
fatedier 已提交
215
		log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort)
F
fatedier 已提交
216 217 218
	}

	// Create dashboard web server.
F
fatedier 已提交
219
	if cfg.DashboardPort > 0 {
F
fatedier 已提交
220
		err = svr.RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort)
F
fatedier 已提交
221 222 223 224
		if err != nil {
			err = fmt.Errorf("Create dashboard web server error, %v", err)
			return
		}
T
timerever 已提交
225
		log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
F
fatedier 已提交
226
	}
F
FishFish 已提交
227

F
fatedier 已提交
228 229 230 231
	return
}

func (svr *Service) Run() {
F
fatedier 已提交
232 233
	if svr.rc.NatHoleController != nil {
		go svr.rc.NatHoleController.Run()
F
fatedier 已提交
234
	}
F
fatedier 已提交
235
	if g.GlbServerCfg.KcpBindPort > 0 {
F
fatedier 已提交
236 237 238
		go svr.HandleListener(svr.kcpListener)
	}

F
fatedier 已提交
239 240
	go svr.HandleListener(svr.websocketListener)

F
fatedier 已提交
241 242 243 244
	svr.HandleListener(svr.listener)
}

func (svr *Service) HandleListener(l frpNet.Listener) {
F
fatedier 已提交
245 246
	// Listen for incoming connections from client.
	for {
F
fatedier 已提交
247
		c, err := l.Accept()
F
fatedier 已提交
248 249 250 251 252 253
		if err != nil {
			log.Warn("Listener for incoming connections from client closed")
			return
		}

		// Start a new goroutine for dealing connections.
254 255 256 257 258
		go func(frpConn frpNet.Conn) {
			dealFn := func(conn frpNet.Conn) {
				var rawMsg msg.Message
				conn.SetReadDeadline(time.Now().Add(connReadTimeout))
				if rawMsg, err = msg.ReadMsg(conn); err != nil {
F
fatedier 已提交
259
					log.Trace("Failed to read message: %v", err)
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
					conn.Close()
					return
				}
				conn.SetReadDeadline(time.Time{})

				switch m := rawMsg.(type) {
				case *msg.Login:
					err = svr.RegisterControl(conn, m)
					// If login failed, send error message there.
					// Otherwise send success message in control's work goroutine.
					if err != nil {
						conn.Warn("%v", err)
						msg.WriteMsg(conn, &msg.LoginResp{
							Version: version.Full(),
							Error:   err.Error(),
						})
						conn.Close()
					}
				case *msg.NewWorkConn:
					svr.RegisterWorkConn(conn, m)
F
fatedier 已提交
280 281
				case *msg.NewVisitorConn:
					if err = svr.RegisterVisitorConn(conn, m); err != nil {
F
fatedier 已提交
282
						conn.Warn("%v", err)
F
fatedier 已提交
283
						msg.WriteMsg(conn, &msg.NewVisitorConnResp{
F
fatedier 已提交
284 285 286 287 288
							ProxyName: m.ProxyName,
							Error:     err.Error(),
						})
						conn.Close()
					} else {
F
fatedier 已提交
289
						msg.WriteMsg(conn, &msg.NewVisitorConnResp{
F
fatedier 已提交
290 291 292 293
							ProxyName: m.ProxyName,
							Error:     "",
						})
					}
294 295 296 297
				default:
					log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String())
					conn.Close()
				}
F
fatedier 已提交
298 299
			}

F
fatedier 已提交
300
			if g.GlbServerCfg.TcpMux {
F
fatedier 已提交
301
				fmuxCfg := fmux.DefaultConfig()
F
fatedier 已提交
302
				fmuxCfg.KeepAliveInterval = 20 * time.Second
F
fatedier 已提交
303 304
				fmuxCfg.LogOutput = ioutil.Discard
				session, err := fmux.Server(frpConn, fmuxCfg)
F
fatedier 已提交
305
				if err != nil {
306
					log.Warn("Failed to create mux connection: %v", err)
F
fatedier 已提交
307
					frpConn.Close()
308 309 310 311 312 313
					return
				}

				for {
					stream, err := session.AcceptStream()
					if err != nil {
314
						log.Debug("Accept new mux stream error: %v", err)
F
fatedier 已提交
315
						session.Close()
316 317 318 319
						return
					}
					wrapConn := frpNet.WrapConn(stream)
					go dealFn(wrapConn)
F
fatedier 已提交
320
				}
321 322
			} else {
				dealFn(frpConn)
F
fatedier 已提交
323 324 325 326 327
			}
		}(c)
	}
}

328
func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (err error) {
F
fatedier 已提交
329 330 331 332 333 334 335 336 337 338 339
	ctlConn.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]",
		ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)

	// Check client version.
	if ok, msg := version.Compat(loginMsg.Version); !ok {
		err = fmt.Errorf("%s", msg)
		return
	}

	// Check auth.
	nowTime := time.Now().Unix()
F
fatedier 已提交
340
	if g.GlbServerCfg.AuthTimeout != 0 && nowTime-loginMsg.Timestamp > g.GlbServerCfg.AuthTimeout {
F
fatedier 已提交
341 342 343
		err = fmt.Errorf("authorization timeout")
		return
	}
F
fatedier 已提交
344
	if util.GetAuthKey(g.GlbServerCfg.Token, loginMsg.Timestamp) != loginMsg.PrivilegeKey {
F
fatedier 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357
		err = fmt.Errorf("authorization failed")
		return
	}

	// If client's RunId is empty, it's a new client, we just create a new controller.
	// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
	if loginMsg.RunId == "" {
		loginMsg.RunId, err = util.RandId()
		if err != nil {
			return
		}
	}

F
fatedier 已提交
358
	ctl := NewControl(svr.rc, ctlConn, loginMsg)
F
fatedier 已提交
359

F
fatedier 已提交
360
	if oldCtl := svr.rc.CtlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
F
fatedier 已提交
361
		oldCtl.allShutdown.WaitDone()
F
fatedier 已提交
362 363 364 365
	}

	ctlConn.AddLogPrefix(loginMsg.RunId)
	ctl.Start()
366 367 368

	// for statistics
	StatsNewClient()
F
fatedier 已提交
369 370 371 372
	return
}

// RegisterWorkConn register a new work connection to control and proxies need it.
373
func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) {
F
fatedier 已提交
374
	ctl, exist := svr.rc.CtlManager.GetById(newMsg.RunId)
F
fatedier 已提交
375 376 377 378 379 380 381 382
	if !exist {
		workConn.Warn("No client control found for run id [%s]", newMsg.RunId)
		return
	}
	ctl.RegisterWorkConn(workConn)
	return
}

F
fatedier 已提交
383
func (svr *Service) RegisterVisitorConn(visitorConn frpNet.Conn, newMsg *msg.NewVisitorConn) error {
F
fatedier 已提交
384
	return svr.rc.VisitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
F
fatedier 已提交
385 386
		newMsg.UseEncryption, newMsg.UseCompression)
}