service.go 10.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

F
fatedier 已提交
15 16 17
package server

import (
F
fatedier 已提交
18
	"bytes"
F
fatedier 已提交
19
	"fmt"
F
fatedier 已提交
20
	"io/ioutil"
F
fatedier 已提交
21 22
	"net"
	"net/http"
F
fatedier 已提交
23 24 25
	"time"

	"github.com/fatedier/frp/assets"
F
fatedier 已提交
26
	"github.com/fatedier/frp/g"
F
fatedier 已提交
27
	"github.com/fatedier/frp/models/msg"
F
fatedier 已提交
28 29
	"github.com/fatedier/frp/server/group"
	"github.com/fatedier/frp/server/ports"
F
fatedier 已提交
30
	"github.com/fatedier/frp/utils/log"
31
	frpNet "github.com/fatedier/frp/utils/net"
F
fatedier 已提交
32 33 34
	"github.com/fatedier/frp/utils/util"
	"github.com/fatedier/frp/utils/version"
	"github.com/fatedier/frp/utils/vhost"
35

F
fatedier 已提交
36
	"github.com/fatedier/golib/net/mux"
F
fatedier 已提交
37
	fmux "github.com/hashicorp/yamux"
F
fatedier 已提交
38 39
)

F
fatedier 已提交
40 41 42 43
const (
	connReadTimeout time.Duration = 10 * time.Second
)

44 45
var ServerService *Service

46
// Server service
F
fatedier 已提交
47
type Service struct {
48
	// Dispatch connections to different handlers listen on same port
49 50
	muxer *mux.Mux

51
	// Accept connections from client
52
	listener frpNet.Listener
F
fatedier 已提交
53

54
	// Accept connections using kcp
F
fatedier 已提交
55 56
	kcpListener frpNet.Listener

F
FishFish 已提交
57 58 59
	// Accept connections using websocket
	websocketListener frpNet.Listener

60
	// For https proxies, route requests to different clients by hostname and other infomation
F
fatedier 已提交
61 62
	VhostHttpsMuxer *vhost.HttpsMuxer

F
fatedier 已提交
63 64
	httpReverseProxy *vhost.HttpReverseProxy

65
	// Manage all controllers
F
fatedier 已提交
66 67
	ctlManager *ControlManager

68
	// Manage all proxies
F
fatedier 已提交
69
	pxyManager *ProxyManager
F
fatedier 已提交
70

71
	// Manage all visitor listeners
F
fatedier 已提交
72
	visitorManager *VisitorManager
F
fatedier 已提交
73

74
	// Manage all tcp ports
F
fatedier 已提交
75
	tcpPortManager *ports.PortManager
76

77
	// Manage all udp ports
F
fatedier 已提交
78
	udpPortManager *ports.PortManager
79

80
	// Tcp Group Controller
F
fatedier 已提交
81
	tcpGroupCtl *group.TcpGroupCtl
82 83

	// Controller for nat hole connections
F
fatedier 已提交
84
	natHoleController *NatHoleController
F
fatedier 已提交
85 86 87
}

func NewService() (svr *Service, err error) {
F
fatedier 已提交
88
	cfg := &g.GlbServerCfg.ServerCommonConf
F
fatedier 已提交
89
	svr = &Service{
F
fatedier 已提交
90 91 92
		ctlManager:     NewControlManager(),
		pxyManager:     NewProxyManager(),
		visitorManager: NewVisitorManager(),
F
fatedier 已提交
93 94
		tcpPortManager: ports.NewPortManager("tcp", cfg.ProxyBindAddr, cfg.AllowPorts),
		udpPortManager: ports.NewPortManager("udp", cfg.ProxyBindAddr, cfg.AllowPorts),
F
fatedier 已提交
95
	}
F
fatedier 已提交
96
	svr.tcpGroupCtl = group.NewTcpGroupCtl(svr.tcpPortManager)
F
fatedier 已提交
97 98

	// Init assets.
F
fatedier 已提交
99
	err = assets.Load(cfg.AssetsDir)
F
fatedier 已提交
100 101 102 103 104
	if err != nil {
		err = fmt.Errorf("Load assets error: %v", err)
		return
	}

105 106 107 108 109 110 111 112 113 114 115 116 117
	var (
		httpMuxOn  bool
		httpsMuxOn bool
	)
	if cfg.BindAddr == cfg.ProxyBindAddr {
		if cfg.BindPort == cfg.VhostHttpPort {
			httpMuxOn = true
		}
		if cfg.BindPort == cfg.VhostHttpsPort {
			httpsMuxOn = true
		}
	}

F
fatedier 已提交
118
	// Listen for accepting connections from client.
119
	ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindPort))
F
fatedier 已提交
120 121 122 123
	if err != nil {
		err = fmt.Errorf("Create server listener error, %v", err)
		return
	}
F
FishFish 已提交
124

F
fix ci  
fatedier 已提交
125 126
	svr.muxer = mux.NewMux(ln)
	go svr.muxer.Serve()
F
FishFish 已提交
127 128
	ln = svr.muxer.DefaultListener()

129
	svr.listener = frpNet.WrapLogListener(ln)
F
fatedier 已提交
130
	log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort)
F
fatedier 已提交
131 132

	// Listen for accepting connections from client using kcp protocol.
F
fatedier 已提交
133 134
	if cfg.KcpBindPort > 0 {
		svr.kcpListener, err = frpNet.ListenKcp(cfg.BindAddr, cfg.KcpBindPort)
F
fatedier 已提交
135
		if err != nil {
F
fatedier 已提交
136
			err = fmt.Errorf("Listen on kcp address udp [%s:%d] error: %v", cfg.BindAddr, cfg.KcpBindPort, err)
F
fatedier 已提交
137 138
			return
		}
F
fatedier 已提交
139
		log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.KcpBindPort)
F
fatedier 已提交
140
	}
F
fatedier 已提交
141

F
fatedier 已提交
142
	// Listen for accepting connections from client using websocket protocol.
F
fatedier 已提交
143
	websocketPrefix := []byte("GET " + frpNet.FrpWebsocketPath)
F
fatedier 已提交
144 145 146 147 148
	websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
		return bytes.Equal(data, websocketPrefix)
	})
	svr.websocketListener = frpNet.NewWebsocketListener(websocketLn)

F
fatedier 已提交
149
	// Create http vhost muxer.
F
fatedier 已提交
150
	if cfg.VhostHttpPort > 0 {
F
fatedier 已提交
151 152 153
		rp := vhost.NewHttpReverseProxy(vhost.HttpReverseProxyOptions{
			ResponseHeaderTimeoutS: cfg.VhostHttpTimeout,
		})
F
fatedier 已提交
154 155 156 157 158 159
		svr.httpReverseProxy = rp

		address := fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
		server := &http.Server{
			Addr:    address,
			Handler: rp,
F
fatedier 已提交
160
		}
F
fatedier 已提交
161
		var l net.Listener
162
		if httpMuxOn {
F
fatedier 已提交
163
			l = svr.muxer.ListenHttp(1)
164 165 166 167 168 169
		} else {
			l, err = net.Listen("tcp", address)
			if err != nil {
				err = fmt.Errorf("Create vhost http listener error, %v", err)
				return
			}
F
fatedier 已提交
170
		}
F
fatedier 已提交
171
		go server.Serve(l)
F
fatedier 已提交
172
		log.Info("http service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
F
fatedier 已提交
173 174 175
	}

	// Create https vhost muxer.
F
fatedier 已提交
176
	if cfg.VhostHttpsPort > 0 {
177 178
		var l net.Listener
		if httpsMuxOn {
F
fatedier 已提交
179
			l = svr.muxer.ListenHttps(1)
180 181 182 183 184 185
		} else {
			l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort))
			if err != nil {
				err = fmt.Errorf("Create server listener error, %v", err)
				return
			}
F
fatedier 已提交
186
		}
187 188

		svr.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second)
F
fatedier 已提交
189 190 191 192
		if err != nil {
			err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
			return
		}
F
fatedier 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205 206
		log.Info("https service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort)
	}

	// Create nat hole controller.
	if cfg.BindUdpPort > 0 {
		var nc *NatHoleController
		addr := fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindUdpPort)
		nc, err = NewNatHoleController(addr)
		if err != nil {
			err = fmt.Errorf("Create nat hole controller error, %v", err)
			return
		}
		svr.natHoleController = nc
		log.Info("nat hole udp service listen on %s:%d", cfg.BindAddr, cfg.BindUdpPort)
F
fatedier 已提交
207 208 209
	}

	// Create dashboard web server.
F
fatedier 已提交
210
	if cfg.DashboardPort > 0 {
T
timerever 已提交
211
		err = RunDashboardServer(cfg.DashboardAddr, cfg.DashboardPort)
F
fatedier 已提交
212 213 214 215
		if err != nil {
			err = fmt.Errorf("Create dashboard web server error, %v", err)
			return
		}
T
timerever 已提交
216
		log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
F
fatedier 已提交
217
	}
F
FishFish 已提交
218

F
fatedier 已提交
219 220 221 222
	return
}

func (svr *Service) Run() {
F
fatedier 已提交
223 224 225
	if svr.natHoleController != nil {
		go svr.natHoleController.Run()
	}
F
fatedier 已提交
226
	if g.GlbServerCfg.KcpBindPort > 0 {
F
fatedier 已提交
227 228 229
		go svr.HandleListener(svr.kcpListener)
	}

F
fatedier 已提交
230 231
	go svr.HandleListener(svr.websocketListener)

F
fatedier 已提交
232 233 234 235
	svr.HandleListener(svr.listener)
}

func (svr *Service) HandleListener(l frpNet.Listener) {
F
fatedier 已提交
236 237
	// Listen for incoming connections from client.
	for {
F
fatedier 已提交
238
		c, err := l.Accept()
F
fatedier 已提交
239 240 241 242 243 244
		if err != nil {
			log.Warn("Listener for incoming connections from client closed")
			return
		}

		// Start a new goroutine for dealing connections.
245 246 247 248 249
		go func(frpConn frpNet.Conn) {
			dealFn := func(conn frpNet.Conn) {
				var rawMsg msg.Message
				conn.SetReadDeadline(time.Now().Add(connReadTimeout))
				if rawMsg, err = msg.ReadMsg(conn); err != nil {
F
fatedier 已提交
250
					log.Trace("Failed to read message: %v", err)
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
					conn.Close()
					return
				}
				conn.SetReadDeadline(time.Time{})

				switch m := rawMsg.(type) {
				case *msg.Login:
					err = svr.RegisterControl(conn, m)
					// If login failed, send error message there.
					// Otherwise send success message in control's work goroutine.
					if err != nil {
						conn.Warn("%v", err)
						msg.WriteMsg(conn, &msg.LoginResp{
							Version: version.Full(),
							Error:   err.Error(),
						})
						conn.Close()
					}
				case *msg.NewWorkConn:
					svr.RegisterWorkConn(conn, m)
F
fatedier 已提交
271 272
				case *msg.NewVisitorConn:
					if err = svr.RegisterVisitorConn(conn, m); err != nil {
F
fatedier 已提交
273
						conn.Warn("%v", err)
F
fatedier 已提交
274
						msg.WriteMsg(conn, &msg.NewVisitorConnResp{
F
fatedier 已提交
275 276 277 278 279
							ProxyName: m.ProxyName,
							Error:     err.Error(),
						})
						conn.Close()
					} else {
F
fatedier 已提交
280
						msg.WriteMsg(conn, &msg.NewVisitorConnResp{
F
fatedier 已提交
281 282 283 284
							ProxyName: m.ProxyName,
							Error:     "",
						})
					}
285 286 287 288
				default:
					log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String())
					conn.Close()
				}
F
fatedier 已提交
289 290
			}

F
fatedier 已提交
291
			if g.GlbServerCfg.TcpMux {
F
fatedier 已提交
292
				fmuxCfg := fmux.DefaultConfig()
F
fatedier 已提交
293
				fmuxCfg.KeepAliveInterval = 20 * time.Second
F
fatedier 已提交
294 295
				fmuxCfg.LogOutput = ioutil.Discard
				session, err := fmux.Server(frpConn, fmuxCfg)
F
fatedier 已提交
296
				if err != nil {
297
					log.Warn("Failed to create mux connection: %v", err)
F
fatedier 已提交
298
					frpConn.Close()
299 300 301 302 303 304
					return
				}

				for {
					stream, err := session.AcceptStream()
					if err != nil {
305
						log.Debug("Accept new mux stream error: %v", err)
F
fatedier 已提交
306
						session.Close()
307 308 309 310
						return
					}
					wrapConn := frpNet.WrapConn(stream)
					go dealFn(wrapConn)
F
fatedier 已提交
311
				}
312 313
			} else {
				dealFn(frpConn)
F
fatedier 已提交
314 315 316 317 318
			}
		}(c)
	}
}

319
func (svr *Service) RegisterControl(ctlConn frpNet.Conn, loginMsg *msg.Login) (err error) {
F
fatedier 已提交
320 321 322 323 324 325 326 327 328 329 330
	ctlConn.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]",
		ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)

	// Check client version.
	if ok, msg := version.Compat(loginMsg.Version); !ok {
		err = fmt.Errorf("%s", msg)
		return
	}

	// Check auth.
	nowTime := time.Now().Unix()
F
fatedier 已提交
331
	if g.GlbServerCfg.AuthTimeout != 0 && nowTime-loginMsg.Timestamp > g.GlbServerCfg.AuthTimeout {
F
fatedier 已提交
332 333 334
		err = fmt.Errorf("authorization timeout")
		return
	}
F
fatedier 已提交
335
	if util.GetAuthKey(g.GlbServerCfg.Token, loginMsg.Timestamp) != loginMsg.PrivilegeKey {
F
fatedier 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
		err = fmt.Errorf("authorization failed")
		return
	}

	// If client's RunId is empty, it's a new client, we just create a new controller.
	// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
	if loginMsg.RunId == "" {
		loginMsg.RunId, err = util.RandId()
		if err != nil {
			return
		}
	}

	ctl := NewControl(svr, ctlConn, loginMsg)

	if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
F
fatedier 已提交
352
		oldCtl.allShutdown.WaitDone()
F
fatedier 已提交
353 354 355 356
	}

	ctlConn.AddLogPrefix(loginMsg.RunId)
	ctl.Start()
357 358 359

	// for statistics
	StatsNewClient()
F
fatedier 已提交
360 361 362 363
	return
}

// RegisterWorkConn register a new work connection to control and proxies need it.
364
func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkConn) {
F
fatedier 已提交
365 366 367 368 369 370 371 372 373
	ctl, exist := svr.ctlManager.GetById(newMsg.RunId)
	if !exist {
		workConn.Warn("No client control found for run id [%s]", newMsg.RunId)
		return
	}
	ctl.RegisterWorkConn(workConn)
	return
}

F
fatedier 已提交
374 375
func (svr *Service) RegisterVisitorConn(visitorConn frpNet.Conn, newMsg *msg.NewVisitorConn) error {
	return svr.visitorManager.NewConn(newMsg.ProxyName, visitorConn, newMsg.Timestamp, newMsg.SignKey,
F
fatedier 已提交
376 377 378
		newMsg.UseEncryption, newMsg.UseCompression)
}

F
fatedier 已提交
379
func (svr *Service) RegisterProxy(name string, pxy Proxy) error {
F
fatedier 已提交
380
	return svr.pxyManager.Add(name, pxy)
F
fatedier 已提交
381 382 383 384 385
}

func (svr *Service) DelProxy(name string) {
	svr.pxyManager.Del(name)
}