golang网络编程之基于TCP协议实现长连接 golang心跳检测

suoniao 2021-01-28
需要:0索币
package main

// golang实现带有心跳检测的tcp长连接
// server
import (
	"fmt"
	"net"
	"time"
)

// message struct:
// c#d

var (
	Req_REGISTER byte = 1 // 1 --- c register cid
	Res_REGISTER byte = 2 // 2 --- s response

	Req_HEARTBEAT byte = 3 // 3 --- s send heartbeat req
	Res_HEARTBEAT byte = 4 // 4 --- c send heartbeat res

	Req byte = 5 // 5 --- cs send data
	Res byte = 6 // 6 --- cs send ack
)

type CS struct {
	Rch chan []byte
	Wch chan []byte
	Dch chan bool
	u   string
}

func NewCs(uid string) *CS {
	return &CS{Rch: make(chan []byte), Wch: make(chan []byte), u: uid}
}

var CMap map[string]*CS

func main() {
	CMap = make(map[string]*CS)
	listen, err := net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP("127.0.0.1"), 6666, ""})
	if err != nil {
		fmt.Println("监听端口失败:", err.Error())
		return
	}
	fmt.Println("已初始化连接,等待客户端连接...")
	go PushGRT()
	Server(listen)
	select {}
}

func PushGRT() {
	for {
		time.Sleep(15 * time.Second)
		for k, v := range CMap {
			fmt.Println("push msg to user:" + k)
			v.Wch <- []byte{Req, '#', 'p', 'u', 's', 'h', '!'}
		}
	}
}

func Server(listen *net.TCPListener) {
	for {
		conn, err := listen.AcceptTCP()
		if err != nil {
			fmt.Println("接受客户端连接异常:", err.Error())
			continue
		}
		fmt.Println("客户端连接来自:", conn.RemoteAddr().String())
		// handler goroutine
		go Handler(conn)
	}
}

func Handler(conn net.Conn) {
	defer conn.Close()
	data := make([]byte, 128)
	var uid string
	var C *CS
	for {
		conn.Read(data)
		fmt.Println("客户端发来数据:", string(data))
		if data[0] == Req_REGISTER { // register
			conn.Write([]byte{Res_REGISTER, '#', 'o', 'k'})
			uid = string(data[2:])
			C = NewCs(uid)
			CMap[uid] = C
			//			fmt.Println("register client")
			//			fmt.Println(uid)
			break
		} else {
			conn.Write([]byte{Res_REGISTER, '#', 'e', 'r'})
		}
	}
	//	WHandler
	go WHandler(conn, C)

	//	RHandler
	go RHandler(conn, C)

	//	Worker
	go Work(C)
	select {
	case <-C.Dch:
		fmt.Println("close handler goroutine")
	}
}

// 正常写数据
// 定时检测 conn die => goroutine die
func WHandler(conn net.Conn, C *CS) {
	// 读取业务Work 写入Wch的数据
	ticker := time.NewTicker(20 * time.Second)
	for {
		select {
		case d := <-C.Wch:
			conn.Write(d)
		case <-ticker.C:
			if _, ok := CMap[C.u]; !ok {
				fmt.Println("conn die, close WHandler")
				return
			}
		}
	}
}

// 读客户端数据 + 心跳检测
func RHandler(conn net.Conn, C *CS) {
	// 心跳ack
	// 业务数据 写入Wch

	for {
		data := make([]byte, 128)
		// setReadTimeout
		err := conn.SetReadDeadline(time.Now().Add(10 * time.Second))
		if err != nil {
			fmt.Println(err)
		}
		if _, derr := conn.Read(data); derr == nil {
			// 可能是来自客户端的消息确认
			//           	     数据消息
			fmt.Println(data)
			if data[0] == Res {
				fmt.Println("recv client data ack")
			} else if data[0] == Req {
				fmt.Println("recv client data")
				fmt.Println(data)
				conn.Write([]byte{Res, '#'})
				// C.Rch <- data
			}

			continue
		}

		conn.Write([]byte{Req_HEARTBEAT, '#'})
		fmt.Println("send ht packet")
		conn.SetReadDeadline(time.Now().Add(2 * time.Second))
		if _, herr := conn.Read(data); herr == nil {
			// fmt.Println(string(data))
			fmt.Println("resv ht packet ack")
		} else {
			delete(CMap, C.u)
			fmt.Println("delete user!")
			return
		}
	}
}

func Work(C *CS) {
	time.Sleep(5 * time.Second)
	C.Wch <- []byte{Req, '#', 'h', 'e', 'l', 'l', 'o'}

	time.Sleep(15 * time.Second)
	C.Wch <- []byte{Req, '#', 'h', 'e', 'l', 'l', 'o'}
	// 从读ch读信息
	/*	ticker := time.NewTicker(20 * time.Second)
		for {
			select {
			case d := <-C.Rch:
				C.Wch <- d
			case <-ticker.C:
				if _, ok := CMap[C.u]; !ok {
					return
				}
			}

		}
	*/ // 往写ch写信息
}
package main

// golang实现带有心跳检测的tcp长连接
// server

import (
	"fmt"
	"net"
)

var (
	Req_REGISTER byte = 1 // 1 --- c register cid
	Res_REGISTER byte   = 2 // 2 --- s response

	Req_HEARTBEAT byte = 3 // 3 --- s send heartbeat req
	Res_HEARTBEAT byte = 4 // 4 --- c send heartbeat res

	Req  byte = 5 // 5 --- cs send data
	Res  byte = 6 // 6 --- cs send ack
)

var Dch chan bool
var Rch chan []byte
var Wch chan []byte

func main() {
	Dch = make(chan bool)
	Rch = make(chan []byte)
	Wch = make(chan []byte)
	addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:6666")
	conn, err := net.DialTCP("tcp", nil, addr)
//	conn, err := net.Dial("tcp", "127.0.0.1:6666")
	if err != nil {
		fmt.Println("连接服务端失败:", err.Error())
		return
	}
	fmt.Println("已连接服务器")
	defer conn.Close()
	go Handler(conn)
	select {
	    case <- Dch:
		    fmt.Println("关闭连接")
	}
}

func Handler(conn *net.TCPConn) {
	// 直到register ok
	data := make([]byte, 128)
	for {
		conn.Write([]byte{Req_REGISTER, '#', '2'})
		conn.Read(data)
//		fmt.Println(string(data))
		if data[0] == Res_REGISTER {
			break
		}
	}
//	fmt.Println("i'm register")
	go RHandler(conn)
	go WHandler(conn)
	go Work()
}

func RHandler(conn *net.TCPConn) {

	for {
		// 心跳包,回复ack
	data := make([]byte, 128)
		i,_ := conn.Read(data)
		if i == 0 {
			Dch <- true
			return
		}
		if data[0] == Req_HEARTBEAT {
			fmt.Println("recv ht pack")
			conn.Write([]byte{Res_REGISTER,'#','h'})
			fmt.Println("send ht pack ack")
		} else if data[0] == Req {
			fmt.Println("recv data pack")
			fmt.Printf("%v\n",string(data[2:]))
			Rch <- data[2:]
			conn.Write([]byte{Res,'#'})
		}
	}
}

func WHandler(conn net.Conn) {
	for {
		select {
			case msg := <- Wch:
				fmt.Println((msg[0]))
				fmt.Println("send data after: " + string(msg[1:]))
				conn.Write(msg)
		}
	}

}

func Work() {
	for {
		select {
		case msg := <- Rch:
				fmt.Println("work recv " + string(msg))
				Wch <- []byte{Req,'#','x','x','x','x','x'}
		}
	}

}

来源:https://my.oschina.net/sharelinux/blog/699725

回帖
  • 消灭零回复
相关主题
QLocalServer基于本地套接字socket的服务端server 0
golang实现内存池 go语言字节池byte pool实现代码 0
golang网络编程之基于TCP协议实现长连接 golang心跳检测 0
golang类型断言type的使用 0
golang依赖包管理 mod使用教程 0
golang表单验证库validator 0
如何使用Go语言实现一个简单的异步任务框架呢?生产者消费者模型 0
golang 网络编程设置keepAlive空闲多长时间开始探测、 探测总次数 0
golang网络编程之TCP编程详解 0
go语言利用ioutil.ReadAll读取TCP socket所有数据 0
golang利用io.copy和bytes.Buffer读取TCP socket所有数据 0
golang读取所有socket数据的方式一 0
围绕Handler接口的方法ServeHTTP,可以轻松的写出go中的中间件 0
golang语言错误处理方式check函数,把错误转化为panic 0
golang cannot find module providing package 0
golang语言错误处理errors包使用详解 0
golang基于通道channel实现一个通用连接池 pool 0
golang数据类型之map结构详解 0
golang socket关闭读导致 wsarecv: An existing connection was forcibly closed by the remote host 0
QT通过socket发送GB级别的大文件报错terminate called after throwing an instance of 'std::bad_alloc' 0
相关主题
QListWidget滚动的时候显示不完整 滚动条模式导致的哦 0
Qt设置顶层面板背景透明Qt::WA_TranslucentBackground 隐藏边框Qt::FramelessWindowHint 0
QMenu和QMenuBar样式表大全 qss 0
QT定时器startTimer和timerEvent事件 每隔interval 毫秒就会启动一次 0
C语言内存分配函数malloc和calloc的区别 0
QCheckBox的QSS样式表总结 0
Qt通过qRegisterMetaType注册自定义数据类型 0
QListwidget触发2次itemClicked事件 0
打印机USB驱动开发之实现打印服务器 0
Qt利用QLabel组件来显示图片 0
TableView自定义代理QStyledItemDelegate实现ComboBox 0
Qt利用QGraphicsView类实现图片放大缩小平移显示 0
Qt实现非阻塞延迟方法sleep 0
海康相机SDK的C++对应的接口 0
Qt实现webdav客户端功能支持https协议的webdav客户端 0
CHKDSK解决 移动硬盘只能看见盘符其它信息都看不见另外双击也打不开 0
gogs一直报errror:dial tcp xxx.xxx.xxx.xxx 宿主机的ip 0
索鸟快传2.1.2发布 0
索鸟快传2.1.1发布 0
Qt操作windows注册表的方法 bat从注册表中将键值删除 0