内网穿透反向代理工具frp实现TCP协议代理源码分析

0 363
索鸟 2021-01-27
需要:0索币

frps

cmd/frps/main.go 是frps的入口处,我们从这里开始,main 函数的主体:

func main() {
	crypto.DefaultSalt = "frp"

	Execute()
}

因此我们需要看到 Execute() 函数的内容,其实它是使用了 cobra 这个库,所以实际的入口在

var rootCmd = &cobra.Command{
        Use:   "frps",
        Short: "frps is the server of frp (https://github.com/fatedier/frp)",
        RunE: func(cmd *cobra.Command, args []string) error {
                if showVersion {
                        fmt.Println(version.Full())
                        return nil
                }

                var err error
                if cfgFile != "" {
                        var content string
                        content, err = config.GetRenderedConfFromFile(cfgFile)
                        if err != nil {
                                return err
                        }
                        g.GlbServerCfg.CfgFile = cfgFile
                        err = parseServerCommonCfg(CfgFileTypeIni, content)
                } else {
                        err = parseServerCommonCfg(CfgFileTypeCmd, "")
                }
                if err != nil {
                        return err
                }

                err = runServer()
                if err != nil {
                        fmt.Println(err)
                        os.Exit(1)
                }
                return nil
        },
}

最终,也就是 runServer() 这个函数:

func runServer() (err error) {
        log.InitLog(g.GlbServerCfg.LogWay, g.GlbServerCfg.LogFile, g.GlbServerCfg.LogLevel,
                g.GlbServerCfg.LogMaxDays)
        svr, err := server.NewService()
        if err != nil {
                return err
        }
        log.Info("Start frps success")
        server.ServerService = svr
        svr.Run()
        return
}

svr.Run(),其中的 svr 是来自 server.NewService(),仔细看一下,server.NewService() 其实就是初始化了一大堆东西。
我们直接看 svr.Run() 做了什么:

func (svr *Service) Run() {
        if svr.rc.NatHoleController != nil {
                go svr.rc.NatHoleController.Run()
        }
        if g.GlbServerCfg.KcpBindPort > 0 {
                go svr.HandleListener(svr.kcpListener)
        }

        go svr.HandleListener(svr.websocketListener)
        go svr.HandleListener(svr.tlsListener)

        svr.HandleListener(svr.listener)
}

可以看到,最后frps会执行到 svr.HandleListener(svr.listener),前面的都是什么 nat hole punching, kcp, websocket, tls等等,我们不看。直接看tcp。

func (svr *Service) HandleListener(l frpNet.Listener) {
	// Listen for incoming connections from client.
	for {
		c, err := l.Accept()
		if err != nil {
			log.Warn("Listener for incoming connections from client closed")
			return
		}
		c = frpNet.CheckAndEnableTLSServerConn(c, svr.tlsConfig)

		// Start a new goroutine for dealing connections.
		go func(frpConn frpNet.Conn) {
        ...
        }
    }
}

这里就是监听之后,每来一个新的连接,就起一个goroutine去处理,也就是 go func()... 这一段,然后我们看看内容:

switch m := rawMsg.(type) {
case *msg.Login:
    err = svr.RegisterControl(conn, m)
    // If login failed, send error message there.
    // Otherwise send success message in control's work goroutine.
    if err != nil {
        conn.Warn("%v", err)
        msg.WriteMsg(conn, &msg.LoginResp{
            Version: version.Full(),
            Error:   err.Error(),
        })
        conn.Close()
    }
case *msg.NewWorkConn:
    svr.RegisterWorkConn(conn, m)
case *msg.NewVisitorConn:
    if err = svr.RegisterVisitorConn(conn, m); err != nil {
        conn.Warn("%v", err)
        msg.WriteMsg(conn, &msg.NewVisitorConnResp{
            ProxyName: m.ProxyName,
            Error:     err.Error(),
        })
        conn.Close()
    } else {
        msg.WriteMsg(conn, &msg.NewVisitorConnResp{
            ProxyName: m.ProxyName,
            Error:     "",
        })
    }
default:
    log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String())
    conn.Close()
}

这就是服务端启动之后,卡住的地方了。客户端建立连接之后,会发送一个消息,它的类型可能是 msg.Loginmsg.NewWorkConnmsg.NewVisitorConn。上一篇我们说了,visitor
是用于stcp也就是端对端加密通信的,我们不看。workConn就是用于转发流量的,Login就是新的客户端连上去之后进行启动。

frpc

同样,我们从 cmd/frpc/main.go 看起:

func main() {
        crypto.DefaultSalt = "frp"

        sub.Execute()
}

跳转到 sub.Execute()

var rootCmd = &cobra.Command{
        Use:   "frpc",
        Short: "frpc is the client of frp (https://github.com/fatedier/frp)",
        RunE: func(cmd *cobra.Command, args []string) error {
                if showVersion {
                        fmt.Println(version.Full())
                        return nil
                }

                // Do not show command usage here.
                err := runClient(cfgFile)
                if err != nil {
                        fmt.Println(err)
                        os.Exit(1)
                }
                return nil
        },
}

func Execute() {
        if err := rootCmd.Execute(); err != nil {
                os.Exit(1)
        }
}

然后我们看 runClient 函数:

func runClient(cfgFilePath string) (err error) {
        var content string
        content, err = config.GetRenderedConfFromFile(cfgFilePath)
        if err != nil {
                return
        }
        g.GlbClientCfg.CfgFile = cfgFilePath

        err = parseClientCommonCfg(CfgFileTypeIni, content)
        if err != nil {
                return
        }

        pxyCfgs, visitorCfgs, err := config.LoadAllConfFromIni(g.GlbClientCfg.User, content, g.GlbClientCfg.Start)
        if err != nil {
                return err
        }

        err = startService(pxyCfgs, visitorCfgs)
        return
}

基本上就是解析配置文件(因为frpc启动的时候要一个配置文件),然后执行 startService

func startService(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) (err error) {
        log.InitLog(g.GlbClientCfg.LogWay, g.GlbClientCfg.LogFile, g.GlbClientCfg.LogLevel, g.GlbClientCfg.LogMaxDays)
        if g.GlbClientCfg.DnsServer != "" {
                s := g.GlbClientCfg.DnsServer
                if !strings.Contains(s, ":") {
                        s += ":53"
                }
                // Change default dns server for frpc
                net.DefaultResolver = &net.Resolver{
                        PreferGo: true,
                        Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
                                return net.Dial("udp", s)
                        },
                }
        }
        svr, errRet := client.NewService(pxyCfgs, visitorCfgs)
        if errRet != nil {
                err = errRet
                return
        }

        // Capture the exit signal if we use kcp.
        if g.GlbClientCfg.Protocol == "kcp" {
                go handleSignal(svr)
        }

        err = svr.Run()
        if g.GlbClientCfg.Protocol == "kcp" {
                <-kcpDoneCh
        }
        return
}

同样的,执行 client.NewService 之后执行 svr.Run(),我们看看 svr.Run() 是什么:

func (svr *Service) Run() error {
	// first login
	for {
		conn, session, err := svr.login()
		if err != nil {
			log.Warn("login to server failed: %v", err)

			// if login_fail_exit is true, just exit this program
			// otherwise sleep a while and try again to connect to server
			if g.GlbClientCfg.LoginFailExit {
				return err
			} else {
				time.Sleep(10 * time.Second)
			}
		} else {
			// login success
			ctl := NewControl(svr.runId, conn, session, svr.pxyCfgs, svr.visitorCfgs)
			ctl.Run()
			svr.ctlMu.Lock()
			svr.ctl = ctl
			svr.ctlMu.Unlock()
			break
		}
	}

	go svr.keepControllerWorking()

	if g.GlbClientCfg.AdminPort != 0 {
		err := svr.RunAdminServer(g.GlbClientCfg.AdminAddr, g.GlbClientCfg.AdminPort)
		if err != nil {
			log.Warn("run admin server error: %v", err)
		}
		log.Info("admin server listen on %s:%d", g.GlbClientCfg.AdminAddr, g.GlbClientCfg.AdminPort)
	}

	<-svr.closedCh
	return nil
}

可以看到,客户端启动之后,就是一个 for 循环,写入 login 信息,也就是刚才 frps 里的 msg.loginMsg
然后起一个 goroutine 执行 keepControllerWorking(),之后主 goroutine 就阻塞在 <-svr.closedCh
看看 keepControllerWorking() 的内容:

func (svr *Service) keepControllerWorking() {
	maxDelayTime := 20 * time.Second
	delayTime := time.Second

	for {
		<-svr.ctl.ClosedDoneCh()
		if atomic.LoadUint32(&svr.exit) != 0 {
			return
		}

		for {
			log.Info("try to reconnect to server...")
			conn, session, err := svr.login()
			if err != nil {
				log.Warn("reconnect to server error: %v", err)
				time.Sleep(delayTime)
				delayTime = delayTime * 2
				if delayTime > maxDelayTime {
					delayTime = maxDelayTime
				}
				continue
			}
			// reconnect success, init delayTime
			delayTime = time.Second

			ctl := NewControl(svr.runId, conn, session, svr.pxyCfgs, svr.visitorCfgs)
			ctl.Run()
			svr.ctlMu.Lock()
			svr.ctl = ctl
			svr.ctlMu.Unlock()
			break
		}
	}
}

基本上就是一个循环,里面最终是为了成功连接然后执行 ctl.Run()

func (ctl *Control) Run() {
	go ctl.worker()

	// start all proxies
	ctl.pm.Reload(ctl.pxyCfgs)

	// start all visitors
	go ctl.vm.Run()
	return
}

// If controler is notified by closedCh, reader and writer and handler will exit
func (ctl *Control) worker() {
	go ctl.msgHandler()
	go ctl.reader()
	go ctl.writer()

	select {
	case <-ctl.closedCh:
		// close related channels and wait until other goroutines done
		close(ctl.readCh)
		ctl.readerShutdown.WaitDone()
		ctl.msgHandlerShutdown.WaitDone()

		close(ctl.sendCh)
		ctl.writerShutdown.WaitDone()

		ctl.pm.Close()
		ctl.vm.Close()

		close(ctl.closedDoneCh)
		if ctl.session != nil {
			ctl.session.Close()
		}
		return
	}
}

// msgHandler handles all channel events and do corresponding operations.
func (ctl *Control) msgHandler() {
	defer func() {
		if err := recover(); err != nil {
			ctl.Error("panic error: %v", err)
			ctl.Error(string(debug.Stack()))
		}
	}()
	defer ctl.msgHandlerShutdown.Done()

	hbSend := time.NewTicker(time.Duration(g.GlbClientCfg.HeartBeatInterval) * time.Second)
	defer hbSend.Stop()
	hbCheck := time.NewTicker(time.Second)
	defer hbCheck.Stop()

	ctl.lastPong = time.Now()

	for {
		select {
		case <-hbSend.C:
			// send heartbeat to server
			ctl.Debug("send heartbeat to server")
			ctl.sendCh <- &msg.Ping{}
		case <-hbCheck.C:
			if time.Since(ctl.lastPong) > time.Duration(g.GlbClientCfg.HeartBeatTimeout)*time.Second {
				ctl.Warn("heartbeat timeout")
				// let reader() stop
				ctl.conn.Close()
				return
			}
		case rawMsg, ok := <-ctl.readCh:
			if !ok {
				return
			}

			switch m := rawMsg.(type) {
			case *msg.ReqWorkConn:
				go ctl.HandleReqWorkConn(m)
			case *msg.NewProxyResp:
				ctl.HandleNewProxyResp(m)
			case *msg.Pong:
				ctl.lastPong = time.Now()
				ctl.Debug("receive heartbeat from server")
			}
		}
	}
}

// reader read all messages from frps and send to readCh
func (ctl *Control) reader() {
	defer func() {
		if err := recover(); err != nil {
			ctl.Error("panic error: %v", err)
			ctl.Error(string(debug.Stack()))
		}
	}()
	defer ctl.readerShutdown.Done()
	defer close(ctl.closedCh)

	encReader := crypto.NewReader(ctl.conn, []byte(g.GlbClientCfg.Token))
	for {
		if m, err := msg.ReadMsg(encReader); err != nil {
			if err == io.EOF {
				ctl.Debug("read from control connection EOF")
				return
			} else {
				ctl.Warn("read error: %v", err)
				ctl.conn.Close()
				return
			}
		} else {
			ctl.readCh <- m
		}
	}
}

// writer writes messages got from sendCh to frps
func (ctl *Control) writer() {
	defer ctl.writerShutdown.Done()
	encWriter
            
回帖
  • 消灭零回复
相关主题
2020年最新最新Kubernetes视频教程(K8s)教程 2
程序员转型之制作网课变现,月入过万告别996 1
索鸟快传2.0发布啦 1
两个不同网络的电脑怎么实现文件的互相访问呢? 1
网盘多账号登录软件 1
Java实战闲云旅游项目基于vue+element-ui 1
单点登录技术解决方案基于OAuth2.0的网关鉴权RSA算法生成令牌 1
QT5获取剪贴板上文本信息QT设置剪贴板内容 1
springboot2实战在线购物系统电商系统 1
python web实战之爱家租房项目 1
windows COM实用入门教程 1
C++游戏开发之C++实现的水果忍者游戏 1
计算机视觉库opencv教程 1
node.js实战图书管理系统express框架实现 1
C++实战教程之远程桌面远程控制实战 1
相关主题
PHP7报A non well formed numeric value encountered 0
Linux系统下关闭mongodb的几种命令分享 0
mongodb删除数据、删除集合、删除数据库的命令 0
Git&Github极速入门与攻坚实战课程 0
python爬虫教程使用Django和scrapy实现 0
libnetsnmpmibs.so.31: cannot open shared object file 0
数据结构和算法视频教程 0
redis的hash结构怎么删除数据呢? 0
C++和LUA解析器的数据交互实战视频 0
mongodb errmsg" : "too many users are authenticated 0
C++基础入门视频教程 0
用30个小时精通C++视频教程可能吗? 0
C++分布式多线程游戏服务器开发视频教程socket tcp boost库 0
C++培训教程就业班教程 0
layui的util工具格式时间戳为字符串 0
C++实战教程之远程桌面远程控制实战 1
网络安全培训视频教程 0
LINUX_C++软件工程师视频教程高级项目实战 0
C++高级数据结构与算法视频教程 0
跨域问题很头疼?通过配置nginx轻松解决ajax跨域问题 0