【转载】百万 Go TCP 连接的思考: epoll方式减少资源占用

前几天 Eran Yanay 在 Gophercon Israel 分享了一个讲座:Going Infinite, handling 1M websockets connections in Go, 介绍了使用Go实现支持百万连接的websocket服务器,引起了很大的反响。事实上,相关的技术在2017年的一篇技术中已经介绍: A Million WebSockets and Go, 这篇2017年文章的作者Sergey Kamardin也就是 Eran Yanay 项目中使用的ws库的作者。

Sergey Kamardin 在 A Million WebSockets and Go 一文中介绍了epoll的使用(mailru/easygo,支持epoll on linux, kqueue onbsd, darwin), ws的zero copy的upgrade等技术。

Eran Yanay的分享中对epoll的处理做了简化,而且提供了docker测试的脚本,很方便的在单机上进行百万连接的测试。

2015年的时候我也曾作为百万连接的websocket的服务器的比较:使用四种框架分别实现百万websocket常连接的服务器 、七种WebSocket框架的性能比较。应该说,只要服务器硬件资源足够(内存和CPU), 实现百万连接的服务器并不是很难的事情。

操作系统会为每一个连接分配一定的内存空间外(主要是内部网络数据结构sk_buff的大小、连接的读写缓存,sof),虽然这些可以进行调优,但是如果想使用正常的操作系统的TCP/IP栈的话,这些是硬性的需求。刨去这些,不同的编程语言不同的框架的设计,甚至是不同的需求场景,都会极大的影响TCP服务器内存的占用和处理。

一般Go语言的TCP(和HTTP)的处理都是每一个连接启动一个goroutine去处理,因为我们被教导goroutine的不像thread, 它是很便宜的,可以在服务器上启动成千上万的goroutine。但是对于一百万的连接,这种goroutine-per-connection的模式就至少要启动一百万个goroutine,这对资源的消耗也是极大的。针对不同的操作系统和不同的Go版本,一个goroutine锁使用的最小的栈大小是2KB ~ 8 KB (go stack),如果在每个goroutine中在分配byte buffer用以从连接中读写数据,几十G的内存轻轻松松就分配出去了。

...

服务器epoll代码实现:

  • server.go
var epoller *epoll
func main() {
    setLimit()
    ln, err := net.Listen("tcp", ":8972")
    if err != nil {
        panic(err)
    }
    go func() {
        if err := http.ListenAndServe(":6060", nil); err != nil {
            log.Fatalf("pprof failed: %v", err)
        }
    }()
    epoller, err = MkEpoll()
    if err != nil {
        panic(err)
    }
    go start()
    for {
        conn, e := ln.Accept()
        if e != nil {
            if ne, ok := e.(net.Error); ok && ne.Temporary() {
                log.Printf("accept temp err: %v", ne)
                continue
            }
            log.Printf("accept err: %v", e)
            return
        }
        if err := epoller.Add(conn); err != nil {
            log.Printf("failed to add connection %v", err)
            conn.Close()
        }
    }
}
func start() {
    var buf = make([]byte, 8)
    for {
        connections, err := epoller.Wait()
        if err != nil {
            log.Printf("failed to epoll wait %v", err)
            continue
        }
        for _, conn := range connections {
            if conn == nil {
                break
            }
            if _, err := conn.Read(buf); err != nil {
                if err := epoller.Remove(conn); err != nil {
                    log.Printf("failed to remove %v", err)
                }
                conn.Close()
            }
        }
    }
}
  • epoll实现
type epoll struct {
    fd          int
    connections map[int]net.Conn
    lock        *sync.RWMutex
}
func MkEpoll() (*epoll, error) {
    fd, err := unix.EpollCreate1(0)
    if err != nil {
        return nil, err
    }
    return &epoll{
        fd:          fd,
        lock:        &sync.RWMutex{},
        connections: make(map[int]net.Conn),
    }, nil
}
func (e *epoll) Add(conn net.Conn) error {
    // Extract file descriptor associated with the connection
    fd := socketFD(conn)
    err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_ADD, fd, &unix.EpollEvent{Events: unix.POLLIN | unix.POLLHUP, Fd: int32(fd)})
    if err != nil {
        return err
    }
    e.lock.Lock()
    defer e.lock.Unlock()
    e.connections[fd] = conn
    if len(e.connections)%100 == 0 {
        log.Printf("total number of connections: %v", len(e.connections))
    }
    return nil
}
func (e *epoll) Remove(conn net.Conn) error {
    fd := socketFD(conn)
    err := unix.EpollCtl(e.fd, syscall.EPOLL_CTL_DEL, fd, nil)
    if err != nil {
        return err
    }
    e.lock.Lock()
    defer e.lock.Unlock()
    delete(e.connections, fd)
    if len(e.connections)%100 == 0 {
        log.Printf("total number of connections: %v", len(e.connections))
    }
    return nil
}
func (e *epoll) Wait() ([]net.Conn, error) {
    events := make([]unix.EpollEvent, 100)
    n, err := unix.EpollWait(e.fd, events, 100)
    if err != nil {
        return nil, err
    }
    e.lock.RLock()
    defer e.lock.RUnlock()
    var connections []net.Conn
    for i := 0; i < n; i++ {
        conn := e.connections[int(events[i].Fd)]
        connections = append(connections, conn)
    }
    return connections, nil
}
func socketFD(conn net.Conn) int {
    //tls := reflect.TypeOf(conn.UnderlyingConn()) == reflect.TypeOf(&tls.Conn{})
    // Extract the file descriptor associated with the connection
    //connVal := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn").Elem()
    tcpConn := reflect.Indirect(reflect.ValueOf(conn)).FieldByName("conn")
    //if tls {
    //    tcpConn = reflect.Indirect(tcpConn.Elem())
    //}
    fdVal := tcpConn.FieldByName("fd")
    pfdVal := reflect.Indirect(fdVal).FieldByName("pfd")
    return int(pfdVal.FieldByName("Sysfd").Int())
}

原文地址:百万 Go TCP 连接的思考: epoll方式减少资源占用