← Back

Netpoll Source Code Analysis Netpoll 源码分析 Netpoll ソースコード解析

May 13, 2023

Netpoll is ByteDance's proprietary networking library, serving as the network communication layer for their self-developed RPC framework KiteX. Here we briefly analyze its code module by module. This analysis is based on commit cdac6f0c. This post traces the core execution pathway.

Netpoll 是字节跳动自研网络库,被作为自研RPC框架 KiteX 的网络通信层。这里我们分模块对其代码作一简要分析,本分析基于 commit cdac6f0c。本篇分析其核心执行链路。

Netpoll はByteDance独自のネットワークライブラリで、自社開発RPCフレームワーク KiteX のネットワーク通信層です。ここではモジュールごとにコードを簡単に分析します。この分析はcommit cdac6f0c に基づきます。この記事ではコアの実行パスを追います。

1. Echo Server Example

1. Echo Server 示例

1. エコーサーバーの例

Let's start with a simple echo server:

首先观察一个简单的 echo server:

まず、シンプルなエコーサーバーを見てみましょう:

package main

import (
    "context"
    "time"

    "github.com/cloudwego/netpoll"
)

func main() {
    network, address := "tcp", ":8080"
    listener, _ := netpoll.CreateListener(network, address)

    eventLoop, _ := netpoll.NewEventLoop(
        handle,
        netpoll.WithOnPrepare(prepare),
        netpoll.WithReadTimeout(time.Second),
    )
    eventLoop.Serve(listener)
}

var _ netpoll.OnPrepare = prepare
var _ netpoll.OnRequest = handle

func prepare(connection netpoll.Connection) context.Context {
    return context.Background()
}

func handle(ctx context.Context, connection netpoll.Connection) error {
    reader := connection.Reader()
    writer := connection.Writer()
    defer reader.Release()
    msg, _ := reader.ReadString(reader.Len())
    writer.WriteString(msg)
    writer.Flush()
    println(msg)
    return nil
}

The two initialization interfaces are:

两个初始化接口:

2つの初期化インターフェース:

func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error)
func CreateListener(network, addr string) (l Listener, err error)

They create an event loop and a listener respectively. The listener is passed as an argument to the event loop's serve call. A native net.Listener can also be used here.

分别用于创建事件循环监听器对象,listener 作为参数参与事件循环的服务启动调用。注意这里也可以直接使用原生的 net.Listener

それぞれイベントループリスナーを作成します。リスナーはイベントループのserve呼び出しに引数として渡されます。ネイティブのnet.Listenerも使用できます。

func (evl *eventLoop) Serve(ln net.Listener) error

The event loop is essentially what we normally think of as a Server object — naming it eventLoop more clearly reflects the underlying epoller nature. The required parameter onRequest specifies the handler for processing requests. Its signature is:

这里的事件循环实质就是通常理解的 Server 对象,采用 eventLoop 的名称可以比较清晰地体现其底层 epoller 的本质。eventLoop 对象创建时的必选参数 onRequest 用于指定处理请求的 handler,其函数签名为:

イベントループは通常Serverオブジェクトと考えるものの本質で、eventLoopという名前は基礎となるepollernatureをより明確に反映します。必須パラメータonRequestはリクエスト処理ハンドラを指定します。そのシグネチャは:

type OnRequest func(ctx context.Context, connection Connection) error

It receives a Connection object representing a network connection. The most important interface methods are Reader() and Writer(), which expose the underlying read/write operations:

其传递一个表征网络连接的 Connection 对象用于读写单个连接实例,其最重要的接口方法是通过 Reader()Writer() 以暴露出底层读写操作:

ネットワーク接続を表すConnectionオブジェクトを受け取ります。最も重要なインターフェースメソッドはReader()Writer()で、基礎となる読み書き操作を公開します:

type Connection interface {
    net.Conn
    Reader() Reader
    Writer() Writer
    // ...
}

In the echo server above, we read and write through:

在上述 echo server 的例子中,我们通过以下方法对连接执行读写:

上記のエコーサーバーでは、次のメソッドで読み書きします:

Reader.ReadString(n int) (s string, err error)
Writer.WriteString(s string) (n int, err error)

This section introduced the core components of a simple echo server: EventLoop, Listener, and the connection management and I/O primitives Connection, Reader, and Writer.

以上,本节介绍了一个简单的 echo server 的组分,涉及构建服务器的核心对象 EventLoopListener,以及连接管理和读写相关装置 ConnectionReaderWriter

このセクションでは、シンプルなエコーサーバーのコアコンポーネントを紹介しました:EventLoopListener、および接続管理とI/OプリミティブConnectionReaderWriter

2. Creating the epoll_wait Loop

2. epoll_wait 循环的创建

2. epoll_wait ループの作成

We know that eventLoop.Serve starts the service — from here we can trace netpoll's core call chain. Section 2.1 covers module initialization and 2.2 covers FDOperator, an important abstraction in netpoll.

我们已经知道,eventLoop.Serve 方法实现了服务启动,我们可以由此出发观测 netpoll 的核心调用链路。2.1 先讨论模块初始化,2.2 讨论 netpoll 中一个重要的抽象 FDOperator

eventLoop.Serveがサービスを開始することはわかっています。ここからnetpollのコアコールチェーンをトレースします。2.1ではモジュール初期化、2.2ではnetpollの重要な抽象概念FDOperatorを扱います。

2.1 Initialization

2.1 初始化

2.1 初期化

During module init(), netpoll creates a pollmanager to manage a pool of pollers:

在模块 init() 时,netpoll 就创建了一个 pollmanager 对象管理 poll 池:

モジュールのinit()で、netpollはpollerのプールを管理するpollmanagerを作成します:

func init() {
    var loops = runtime.GOMAXPROCS(0)/20 + 1
    pollmanager = &manager{}
    pollmanager.SetLoadBalance(RoundRobin)
    pollmanager.SetNumLoops(loops)

    setLoggerOutput(os.Stderr)
}

The netpoll developers found through experimentation that a single epoller can handle up to 20 CPU cores, so the poll count is preset to CoreNum/20 + 1. manager.SetNumLoops also allows the poll count to be modified dynamically at runtime:

netpoll 开发者通过实验发现单个 epoller 最高能够负载 20 Core,因此在初始化时将 poll 预先设定为 CoreNum/20+1。manager.SetNumLoops 也提供了运行时动态修改 poll 数量的能力:

netpollの開発者は実験を通じて、単一のepollが最大20 CPUコアを処理できることを発見し、poll数をCoreNum/20+1に設定しています。manager.SetNumLoopsはランタイムでpoll数を動的に変更する機能も提供します:

// SetNumLoops will return error when set numLoops < 1
func (m *manager) SetNumLoops(numLoops int) error {
    if numLoops < 1 {
        return fmt.Errorf("set invalid numLoops[%d]", numLoops)
    }
    if numLoops < m.NumLoops {
        // if less than, close the redundant pollers
        var polls = make([]Poll, numLoops)
        for idx := 0; idx < m.NumLoops; idx++ {
            if idx < numLoops {
                polls[idx] = m.polls[idx]
            } else {
                if err := m.polls[idx].Close(); err != nil {
                    logger.Printf("NETPOLL: poller close failed: %v\n", err)
                }
            }
        }
        m.NumLoops = numLoops
        m.polls = polls
        m.balance.Rebalance(m.polls)
        return nil
    }

    m.NumLoops = numLoops
    return m.Run()
}

Note that SetNumLoops only sets the count and allocates storage. The actual work happens in manager.Run(), which calls openPoll() to create actual poll objects and poll.Wait() to launch the core epoll_wait loop:

注意 SetNumLoops 实际上只是设定了 poll 数量并创建了存储空间,之后 manager.Run()openPoll() 创建实际的 poll 对象,以 poll.Wait() 启动核心 epollWait 循环:

SetNumLoopsはカウントを設定してストレージを確保するだけです。実際の処理はmanager.Run()で行われ、openPoll()で実際のpollオブジェクトを作成し、poll.Wait()でコアのepoll_waitループを起動します:

// Run all pollers.
func (m *manager) Run() error {
    // new poll to fill delta.
    for idx := len(m.polls); idx < m.NumLoops; idx++ {
        var poll = openPoll()
        m.polls = append(m.polls, poll)
        go poll.Wait()
    }
    // LoadBalance must be set before calling Run, otherwise it will panic.
    m.balance.Rebalance(m.polls)
    return nil
}

The key behaviors inside openPoll() are: calling EPOLL_CREATE to create an epoll fd; and creating an eventfd for the poll itself (without starting the epoll loop yet), enabling it to respond to poll.Trigger() and poll.Close().

openPoll() 内部的关键行为是:调用 EPOLL_CREATE 创建 epoll fd;为 poll 本身创建一个 eventfd 并绑定到自身(此时还没有真正启动 epoll 循环),从而能够响应用户对事件循环的主动触发 poll.Trigger() 和 poll 关闭事件 poll.Close()

openPoll()の主要な動作:EPOLL_CREATEでepoll fdを作成し、poll自体のeventfdを作成(まだepollループを開始しない)して、poll.Trigger()poll.Close()に応答できるようにします。

defaultPoll.Wait() is the server's core epoll loop:

defaultPoll.Wait(),服务器的核心 epoll 循环:

defaultPoll.Wait()はサーバーのコアepollループです:

// Wait implements Poll.
func (p *defaultPoll) Wait() (err error) {
    // init
    var caps, msec, n = barriercap, -1, 0
    p.Reset(128, caps)
    // wait
    for {
        if n == p.size && p.size < 128*1024 {
            p.Reset(p.size<<1, caps)
        }
        n, err = EpollWait(p.fd, p.events, msec)
        if err != nil && err != syscall.EINTR {
            return err
        }
        if n <= 0 {
            msec = -1
            runtime.Gosched()
            continue
        }
        msec = 0
        if p.Handler(p.events[:n]) {
            return nil
        }
        // we can make sure that there is no op remaining if Handler finished
        p.opcache.free()
    }
}

The method does some performance-oriented timeout tuning that is not immediately obvious (to be discussed later). The core is EpollWait() and p.Handler(), which dispatches concrete events when they arrive.

该方法为了性能做了一些不太容易理解的超时设定工作(在之后的分析中再详细讨论)。核心在于 EpollWait() 以及在有可用事件时执行 p.Handler() 来对具体事件作出响应。

このメソッドはわかりにくい性能指向のタイムアウト調整を行っています(後の分析で詳細に説明します)。コアはEpollWait()と、イベントが到着したときに具体的なイベントをディスパッチするp.Handler()です。

Therefore, the actual epoll loop (reactor) completes as early as the init phase (i.e., on import netpoll). The eventLoop.Serve method itself is merely the process of binding the listener's file descriptor to one of the already-running pollers.

因此,真正意义上的 epoll 循环(reactor)早在 init 阶段(即 import netpoll 时)就已经执行完毕了,而实际的 eventLoop.Serve 方法,则只不过是将 Listener 的文件描述符绑定到某一个已运行 poll 的过程。

したがって、実際のepollループ(reactor)はinitフェーズ(つまりimport netpoll時)にすでに完了しています。eventLoop.Serveメソッド自体は、リスナーのファイルディスクリプタをすでに実行中のpollerの1つにバインドするプロセスに過ぎません。

2.2 FDOperator

2.2 FDOperator

2.2 FDOperator

FDOperator (file descriptor operator) abstracts file descriptor management for Server, Connection, and even Poll objects in netpoll. All three establish the association between an FD and a poll through FDOperator:

FDOperator(file descriptor operator)抽象了 netpoll 中的 ServerConnection,甚至 Poll 对象本身的文件描述符管理,三者均通过 FDOperator 建立 FD 和 poll 之间的关联:

FDOperator(ファイルディスクリプタオペレーター)は、netpollのServerConnection、さらにはPollオブジェクト自体のファイルディスクリプタ管理を抽象化します。3つはすべてFDOperatorを通じてFDとpollの関連付けを確立します:

// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
    FD   int
    poll Poll
    // ...
}

FDOperator's most important method, Control, binds a specified PollEvent to the poll:

FDOperator 最重要的方法 Control 将指定的 poll 事件 PollEvent 绑定到 poll:

FDOperatorの最も重要なメソッドControlは、指定されたPollEventをpollにバインドします:

func (op *FDOperator) Control(event PollEvent) error {
    return op.poll.Control(op, event)
}

The underlying binding calls syscall.SYS_EPOLL_CTL:

最终调用的绑定过程即 syscall.SYS_EPOLL_CTL

基礎となるバインディングはsyscall.SYS_EPOLL_CTLを呼び出します:

// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
    var op int
    var evt epollevent
    p.setOperator(unsafe.Pointer(&evt.data), operator)
    switch event {
    case PollReadable: // server accept a new connection and wait read
        operator.inuse()
        op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
    case PollWritable: // client create a new connection and wait connect finished
        operator.inuse()
        op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
    }
    return EpollCtl(p.fd, op, operator.FD, &evt)
}
// EpollCtl implements epoll_ctl.
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
    _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
    if err == syscall.Errno(0) {
        err = nil
    }
    return err
}

3. Listening and Responding to New Connection Events

3. 新建连接事件的监听和响应

3. 新規接続イベントの監視と応答

3.1 eventLoop.Serve

3.1 eventLoop.Serve

3.1 eventLoop.Serve

Serve creates a Server object via newServer to store the listener and user-specified options:

Serve 中通过 newServer 创建一个 Server 对象存储 Listener 和用户指定的 options:

ServenewServerを通じてリスナーとユーザー指定オプションを格納するServerオブジェクトを作成します:

// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
    npln, err := ConvertListener(ln)
    if err != nil {
        return err
    }
    evl.Lock()
    evl.svr = newServer(npln, evl.opts, evl.quit)
    evl.svr.Run()
    evl.Unlock()

    err = evl.waitQuit()
    // ensure evl will not be finalized until Serve returns
    runtime.SetFinalizer(evl, nil)
    return err
}

Server.Run() then calls pollmanager.Pick() to bind to a poll, and registers readable event monitoring on the listener's fd via FDOperator.Control:

Server 随即 Run() 启动——本质上是调用 pollmanager.Pick() 方法绑定到一个 poll 上,并通过 FDOperator.Control 为 Listener 对应的文件描述符绑定 Readable 事件监听:

Server.Run()pollmanager.Pick()を呼び出してpollにバインドし、FDOperator.Controlを通じてリスナーのfdに読み取り可能イベントの監視を登録します:

// Run this server.
func (s *server) Run() (err error) {
    s.operator = FDOperator{
        FD:     s.ln.Fd(),
        OnRead: s.OnRead,
        OnHup:  s.OnHup,
    }
    s.operator.poll = pollmanager.Pick()
    err = s.operator.Control(PollReadable)
    if err != nil {
        s.onQuit(err)
    }
    return err
}

When a new connection arrives, the handler branch fires:

新连接建立事件所对应的 handler 分支为:

新しい接続が到着すると、ハンドラブランチが発火します:

// for non-connection
operator.OnRead(p)

This calls server.OnRead, which accepts the connection and initializes a connection object for it:

其中调用 server.OnRead,接受连接并为其初始化一个 connection 对象:

これはserver.OnReadを呼び出し、接続を受け入れてそのためのconnectionオブジェクトを初期化します:

func (s *server) OnRead(p Poll) error {
    // accept socket
    conn, err := s.ln.Accept()
    if err != nil {
        // shut down
        if strings.Contains(err.Error(), "closed") {
            s.operator.Control(PollDetach)
            s.onQuit(err)
            return err
        }
        logger.Println("NETPOLL: accept conn failed:", err.Error())
        return err
    }
    if conn == nil {
        return nil
    }
    // store & register connection
    var connection = &connection{}
    connection.init(conn.(Conn), s.opts)
    if !connection.IsActive() {
        return nil
    }
    var fd = conn.(Conn).Fd()
    connection.AddCloseCallback(func(connection Connection) error {
        s.connections.Delete(fd)
        return nil
    })
    s.connections.Store(fd, connection)

    // trigger onConnect asynchronously
    connection.onConnect()
    return nil
}

The key call here is connection.init(), which initializes the connection object and registers it for read/write event monitoring (see the next section).

其中的关键调用是通过 connection.init() 初始化连接对象,并监听该连接上发生的读写事件(见下节)。

ここでの重要な呼び出しはconnection.init()で、接続オブジェクトを初期化し、読み書きイベントの監視に登録します(次のセクションを参照)。

4. Listening and Responding to Connection Read/Write Events

4. 连接读写事件的监听和响应

4. 接続の読み書きイベントの監視と応答

During connection.init(), the FDOperator is initialized and connection.register() is triggered through connection.onPrepare():

connection.init() 时,初始化 FDOperator,并通过 connection.onPrepare() 触发 connection.register()

connection.init()では、FDOperatorが初期化され、connection.onPrepare()を通じてconnection.register()がトリガーされます:

// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
    // init buffer, barrier, finalizer
    c.readTrigger = make(chan struct{}, 1)
    c.writeTrigger = make(chan error, 1)
    c.bookSize, c.maxSize = block1k/2, pagesize
    c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
    c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)

    c.initNetFD(conn) // conn must be *netFD{}
    c.initFDOperator()
    c.initFinalizer()

    syscall.SetNonblock(c.fd, true)
    // enable TCP_NODELAY by default
    switch c.network {
    case "tcp", "tcp4", "tcp6":
        setTCPNoDelay(c.fd, true)
    }
    // check zero-copy
    if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
        c.supportZeroCopy = true
    }

    // connection initialized and prepare options
    return c.onPrepare(opts)
}

connection.register() performs the final fd event binding:

connection.register() 中则最终进行了连接对应文件描述符的事件绑定:

connection.register()で最終的なfdイベントバインディングが行われます:

// register only use for connection register into poll.
func (c *connection) register() (err error) {
    if c.operator.isUnused() {
        // operator is not registered
        err = c.operator.Control(PollReadable)
    } else {
        // operator is already registered
        // change event to wait read new data
        err = c.operator.Control(PollModReadable)
    }
    if err != nil {
        logger.Printf("NETPOLL: connection register failed: %v", err)
        c.Close()
        return Exception(ErrConnClosed, err.Error())
    }
    return nil
}

The handler branch for connection read events is:

连接读事件对应的 handler 分支为:

接続読み取りイベントのハンドラブランチは:

// for connection
var bs = operator.Inputs(p.barriers[i].bs)
if len(bs) > 0 {
    var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
    operator.InputAck(n)
    if err != nil {
        p.appendHup(operator)
        continue
    }
}

Write events follow the same pattern:

写事件也是类似的过程:

書き込みイベントも同じパターンに従います:

// for connection
var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
if len(bs) > 0 {
    // TODO: Let the upper layer pass in whether to use ZeroCopy.
    var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
    operator.OutputAck(n)
    if err != nil {
        p.appendHup(operator)
        continue
    }
}

At this point we have completed tracing netpoll's overall core call chain.

至此我们完成了对 netpoll 整体核心调用链路的一个跟踪。

以上で、netpollのコアコールチェーン全体のトレースが完了しました。