proxy.go 8.4 KB
Newer Older
F
fatedier 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Copyright 2017 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

import (
F
fatedier 已提交
18
	"context"
F
fatedier 已提交
19 20
	"fmt"
	"io"
F
fatedier 已提交
21 22
	"net"
	"strconv"
F
fatedier 已提交
23 24
	"sync"

F
fatedier 已提交
25 26 27 28 29
	"github.com/fatedier/frp/pkg/config"
	"github.com/fatedier/frp/pkg/msg"
	plugin "github.com/fatedier/frp/pkg/plugin/server"
	frpNet "github.com/fatedier/frp/pkg/util/net"
	"github.com/fatedier/frp/pkg/util/xlog"
F
fatedier 已提交
30
	"github.com/fatedier/frp/server/controller"
31
	"github.com/fatedier/frp/server/metrics"
F
fatedier 已提交
32 33 34 35

	frpIo "github.com/fatedier/golib/io"
)

F
fatedier 已提交
36
type GetWorkConnFn func() (net.Conn, error)
F
fatedier 已提交
37 38

type Proxy interface {
F
fatedier 已提交
39
	Context() context.Context
F
fatedier 已提交
40 41 42
	Run() (remoteAddr string, err error)
	GetName() string
	GetConf() config.ProxyConf
F
fatedier 已提交
43
	GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
F
fatedier 已提交
44
	GetUsedPortsNum() int
45 46
	GetResourceController() *controller.ResourceController
	GetUserInfo() plugin.UserInfo
F
fatedier 已提交
47 48 49 50
	Close()
}

type BaseProxy struct {
51 52 53 54 55 56 57
	name          string
	rc            *controller.ResourceController
	listeners     []net.Listener
	usedPortsNum  int
	poolCount     int
	getWorkConnFn GetWorkConnFn
	serverCfg     config.ServerCommonConf
58
	userInfo      plugin.UserInfo
F
fatedier 已提交
59

F
fatedier 已提交
60 61 62
	mu  sync.RWMutex
	xl  *xlog.Logger
	ctx context.Context
F
fatedier 已提交
63 64 65 66 67 68
}

func (pxy *BaseProxy) GetName() string {
	return pxy.name
}

F
fatedier 已提交
69 70 71 72
func (pxy *BaseProxy) Context() context.Context {
	return pxy.ctx
}

F
fatedier 已提交
73 74 75 76
func (pxy *BaseProxy) GetUsedPortsNum() int {
	return pxy.usedPortsNum
}

77 78 79 80 81 82 83 84
func (pxy *BaseProxy) GetResourceController() *controller.ResourceController {
	return pxy.rc
}

func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo {
	return pxy.userInfo
}

F
fatedier 已提交
85
func (pxy *BaseProxy) Close() {
F
fatedier 已提交
86 87
	xl := xlog.FromContextSafe(pxy.ctx)
	xl.Info("proxy closing")
F
fatedier 已提交
88 89 90 91 92
	for _, l := range pxy.listeners {
		l.Close()
	}
}

F
fatedier 已提交
93 94
// GetWorkConnFromPool try to get a new work connections from pool
// for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
F
fatedier 已提交
95 96
func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) {
	xl := xlog.FromContextSafe(pxy.ctx)
F
fatedier 已提交
97 98 99
	// try all connections from the pool
	for i := 0; i < pxy.poolCount+1; i++ {
		if workConn, err = pxy.getWorkConnFn(); err != nil {
F
fatedier 已提交
100
			xl.Warn("failed to get work connection: %v", err)
F
fatedier 已提交
101 102
			return
		}
F
fatedier 已提交
103 104
		xl.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
		xl.Spawn().AppendPrefix(pxy.GetName())
F
fatedier 已提交
105
		workConn = frpNet.NewContextConn(pxy.ctx, workConn)
F
fatedier 已提交
106

F
fatedier 已提交
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
		var (
			srcAddr    string
			dstAddr    string
			srcPortStr string
			dstPortStr string
			srcPort    int
			dstPort    int
		)

		if src != nil {
			srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
			srcPort, _ = strconv.Atoi(srcPortStr)
		}
		if dst != nil {
			dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
			dstPort, _ = strconv.Atoi(dstPortStr)
		}
F
fatedier 已提交
124 125
		err := msg.WriteMsg(workConn, &msg.StartWorkConn{
			ProxyName: pxy.GetName(),
F
fatedier 已提交
126 127 128 129
			SrcAddr:   srcAddr,
			SrcPort:   uint16(srcPort),
			DstAddr:   dstAddr,
			DstPort:   uint16(dstPort),
130
			Error:     "",
F
fatedier 已提交
131 132
		})
		if err != nil {
F
fatedier 已提交
133
			xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
F
fatedier 已提交
134 135 136 137 138 139 140
			workConn.Close()
		} else {
			break
		}
	}

	if err != nil {
F
fatedier 已提交
141
		xl.Error("try to get work connection failed in the end")
F
fatedier 已提交
142 143 144 145 146 147 148 149
		return
	}
	return
}

// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, frpNet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
150
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
F
fatedier 已提交
151
	xl := xlog.FromContextSafe(pxy.ctx)
F
fatedier 已提交
152
	for _, listener := range pxy.listeners {
F
fatedier 已提交
153
		go func(l net.Listener) {
F
fatedier 已提交
154 155 156 157 158
			for {
				// block
				// if listener is closed, err returned
				c, err := l.Accept()
				if err != nil {
F
fatedier 已提交
159
					xl.Info("listener is closed")
F
fatedier 已提交
160 161
					return
				}
F
fatedier 已提交
162
				xl.Debug("get a user connection [%s]", c.RemoteAddr().String())
163
				go handler(p, c, pxy.serverCfg)
F
fatedier 已提交
164 165 166 167 168
			}
		}(listener)
	}
}

169
func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
170
	getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
F
fatedier 已提交
171

F
fatedier 已提交
172
	xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
F
fatedier 已提交
173
	basePxy := BaseProxy{
174 175 176 177 178 179 180 181
		name:          pxyConf.GetBaseInfo().ProxyName,
		rc:            rc,
		listeners:     make([]net.Listener, 0),
		poolCount:     poolCount,
		getWorkConnFn: getWorkConnFn,
		serverCfg:     serverCfg,
		xl:            xl,
		ctx:           xlog.NewContext(ctx, xl),
182
		userInfo:      userInfo,
F
fatedier 已提交
183 184
	}
	switch cfg := pxyConf.(type) {
F
fatedier 已提交
185
	case *config.TCPProxyConf:
F
fatedier 已提交
186
		basePxy.usedPortsNum = 1
F
fatedier 已提交
187
		pxy = &TCPProxy{
F
go vet  
fatedier 已提交
188
			BaseProxy: &basePxy,
F
fatedier 已提交
189 190
			cfg:       cfg,
		}
F
fatedier 已提交
191 192
	case *config.TCPMuxProxyConf:
		pxy = &TCPMuxProxy{
193 194 195
			BaseProxy: &basePxy,
			cfg:       cfg,
		}
F
fatedier 已提交
196 197
	case *config.HTTPProxyConf:
		pxy = &HTTPProxy{
F
go vet  
fatedier 已提交
198
			BaseProxy: &basePxy,
F
fatedier 已提交
199 200
			cfg:       cfg,
		}
F
fatedier 已提交
201 202
	case *config.HTTPSProxyConf:
		pxy = &HTTPSProxy{
F
go vet  
fatedier 已提交
203
			BaseProxy: &basePxy,
F
fatedier 已提交
204 205
			cfg:       cfg,
		}
F
fatedier 已提交
206
	case *config.UDPProxyConf:
F
fatedier 已提交
207
		basePxy.usedPortsNum = 1
F
fatedier 已提交
208
		pxy = &UDPProxy{
F
go vet  
fatedier 已提交
209
			BaseProxy: &basePxy,
F
fatedier 已提交
210 211
			cfg:       cfg,
		}
F
fatedier 已提交
212 213
	case *config.STCPProxyConf:
		pxy = &STCPProxy{
F
go vet  
fatedier 已提交
214
			BaseProxy: &basePxy,
F
fatedier 已提交
215 216
			cfg:       cfg,
		}
F
fatedier 已提交
217 218
	case *config.XTCPProxyConf:
		pxy = &XTCPProxy{
F
go vet  
fatedier 已提交
219
			BaseProxy: &basePxy,
F
fatedier 已提交
220 221
			cfg:       cfg,
		}
F
fatedier 已提交
222 223
	case *config.SUDPProxyConf:
		pxy = &SUDPProxy{
T
Tank 已提交
224 225 226
			BaseProxy: &basePxy,
			cfg:       cfg,
		}
F
fatedier 已提交
227 228 229 230 231 232
	default:
		return pxy, fmt.Errorf("proxy type not support")
	}
	return
}

F
fatedier 已提交
233
// HandleUserTCPConnection is used for incoming user TCP connections.
F
fatedier 已提交
234
// It can be used for tcp, http, https type.
F
fatedier 已提交
235
func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
F
fatedier 已提交
236
	xl := xlog.FromContextSafe(pxy.Context())
F
fatedier 已提交
237 238
	defer userConn.Close()

239 240 241 242 243 244 245 246 247 248 249 250 251 252
	// server plugin hook
	rc := pxy.GetResourceController()
	content := &plugin.NewUserConnContent{
		User:       pxy.GetUserInfo(),
		ProxyName:  pxy.GetName(),
		ProxyType:  pxy.GetConf().GetBaseInfo().ProxyType,
		RemoteAddr: userConn.RemoteAddr().String(),
	}
	_, err := rc.PluginManager.NewUserConn(content)
	if err != nil {
		xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
		return
	}

F
fatedier 已提交
253
	// try all connections from the pool
F
fatedier 已提交
254
	workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
F
fatedier 已提交
255 256 257 258 259 260 261
	if err != nil {
		return
	}
	defer workConn.Close()

	var local io.ReadWriteCloser = workConn
	cfg := pxy.GetConf().GetBaseInfo()
F
fatedier 已提交
262
	xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
F
fatedier 已提交
263
	if cfg.UseEncryption {
264
		local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
F
fatedier 已提交
265
		if err != nil {
F
fatedier 已提交
266
			xl.Error("create encryption stream error: %v", err)
F
fatedier 已提交
267 268 269 270 271 272
			return
		}
	}
	if cfg.UseCompression {
		local = frpIo.WithCompression(local)
	}
F
fatedier 已提交
273
	xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
F
fatedier 已提交
274 275
		workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())

276 277 278
	name := pxy.GetName()
	proxyType := pxy.GetConf().GetBaseInfo().ProxyType
	metrics.Server.OpenConnection(name, proxyType)
F
fatedier 已提交
279
	inCount, outCount := frpIo.Join(local, userConn)
280 281 282
	metrics.Server.CloseConnection(name, proxyType)
	metrics.Server.AddTrafficIn(name, proxyType, inCount)
	metrics.Server.AddTrafficOut(name, proxyType, outCount)
F
fatedier 已提交
283
	xl.Debug("join connections closed")
F
fatedier 已提交
284 285
}

F
fatedier 已提交
286
type Manager struct {
F
fatedier 已提交
287 288 289 290 291 292
	// proxies indexed by proxy name
	pxys map[string]Proxy

	mu sync.RWMutex
}

F
fatedier 已提交
293 294
func NewManager() *Manager {
	return &Manager{
F
fatedier 已提交
295 296 297 298
		pxys: make(map[string]Proxy),
	}
}

F
fatedier 已提交
299
func (pm *Manager) Add(name string, pxy Proxy) error {
F
fatedier 已提交
300 301 302 303 304 305 306 307 308 309
	pm.mu.Lock()
	defer pm.mu.Unlock()
	if _, ok := pm.pxys[name]; ok {
		return fmt.Errorf("proxy name [%s] is already in use", name)
	}

	pm.pxys[name] = pxy
	return nil
}

F
fatedier 已提交
310
func (pm *Manager) Del(name string) {
F
fatedier 已提交
311 312 313 314 315
	pm.mu.Lock()
	defer pm.mu.Unlock()
	delete(pm.pxys, name)
}

F
fatedier 已提交
316
func (pm *Manager) GetByName(name string) (pxy Proxy, ok bool) {
F
fatedier 已提交
317 318 319 320 321
	pm.mu.RLock()
	defer pm.mu.RUnlock()
	pxy, ok = pm.pxys[name]
	return
}