Rust实现带BBR的高效魔改KCP

完整的代码在此

背景

最近我在用Rust写一个ICMP隧道,因为ICMP包本身是不可靠的,于是需要在ICMP之上写一个可靠协议。一个完整的TCP协议栈显然过于臃肿了(何况也并没有现成的无IO的轮子),所以我就看上了skywind3000大佬的KCP协议——轻量、简洁、代码连我这种网路萌新都看得懂,实在是再好不过了。

一开始是我直接使用原版的C实现+FFI封装,在不开流控的情况下效果不错,但可惜原版的拥塞控制用的是最朴素的TCP Tahoe(作者也表明出于简洁性考虑不准备在标准实现当中使用复杂的流控算法),其在国际网络环境下的表现实在差强人意。想改进,但自己对于自己C的编程水平实在是不抱信心。既然Rust也是性能一流的系统编程语言,我最终还是决定用Rust再写了一个实现。本实现具有以下特点:

  • 相较于C实现进行了架构上的些许调整。
  • 在C实现的基础之上,使用链表+滚动数组优化大窗口下的发送性能。
  • 在C实现的基础之上,使用小根堆优化RTO计时器的效率,提升重传性能。
  • 将著名的BBR拥塞控制算法进行一定修改后试验性地运用到KCP中。

依赖的包

为了使编写更加简便,我们的实现依赖以下Rust crates:

  • bytes——简便的字节处理(代替原来C实现当中的encode_xxx/decode_xxx)。
  • num_enum——简化Rust枚举与字节的互相转换。
  • derivative——简化一些trait的实现。
  • thiserror——简化错误类型的定义。
  • rand——用于BBR随机相位初始化。

架构上的调整

常量与配置

相对于C实现,本实现大幅减少了常量的数量。最后仅剩的常量有五:

/// KCP包头大小
const OVERHEAD: u32 = 24;
/// 最大分段
const MAX_FRAGMENTS: u16 = 128;
/// KCP段类型
#[derive(Debug, Clone, Copy, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
enum Command {
    Push = 81,
    Ack = 82,
    AskWnd = 83,
    TellWnd = 84,
}
/// BBR各阶段的增益
const BBR_GAIN_CYCLE: [usize; 8] = [5, 3, 4, 4, 4, 4, 4, 4];
/// BDP增益的分母,见后文
const BDP_GAIN_DEN: usize = 1024;

常量少了,变量自然就多了,原来C实现的常量在本实现中成为可配置项:

/// 大部分配置的意思如字面
#[derive(Clone, Debug, Deserialize, Derivative)]
#[derivative(Default)]
pub struct Config {
    #[derivative(Default(value = "536"))]
    pub mtu: u32,
    #[derivative(Default(value = "200"))]
    pub rto_default: u32,
    #[derivative(Default(value = "100"))]
    pub rto_min: u32,
    #[derivative(Default(value = "6000"))]
    pub rto_max: u32,
    #[derivative(Default(value = "7000"))]
    pub probe_min: u32,
    #[derivative(Default(value = "120000"))]
    pub probe_max: u32,
    #[derivative(Default(value = "1024"))]
    pub send_wnd: u16,
    #[derivative(Default(value = "1024"))]
    pub recv_wnd: u16,
    #[derivative(Default(value = "40"))]
    pub interval: u32,
    /// 若一个包重传dead_link_thres次后依然失败,则视作底层链路失效。
    #[derivative(Default(value = "20"))]
    pub dead_link_thres: u32,
    /// nodelay模式下, rto_min = 0且rto在重传失败后不指数增长。
    #[derivative(Default(value = "false"))]
    pub nodelay: bool,
    /// stream模式下, 多个数据包可以被合并在同一段内从而减少开销。
    #[derivative(Default(value = "false"))]
    pub stream: bool,
    /// 如果指定,则一个包在fast_resend_thres个在其之后的包ACK之后会直接重传
    #[derivative(Default(value = "None"))]
    pub fast_resend_thres: Option<u32>,
    /// 快速重传的次数上限
    #[derivative(Default(value = "None"))]
    pub fast_resend_limit: Option<u32>,
    /// 是否启用BBR控制算法
    #[derivative(Default(value = "false"))]
    pub bbr: bool,
    /// BBR中RTprop(往返时间)滑动窗口的时间长度(单位:毫秒)
    #[derivative(Default(value = "10000"))]
    pub rt_prop_wnd: u32,
    /// BBR中BtlBw(瓶颈带宽)滑动串口的长度(单位:RTT)
    #[derivative(Default(value = "10"))]
    pub btl_bw_wnd: u32,
    /// BBR中一次RTT/RTprop探测的时间(单位:RTT),减少该值可以减轻RTT探测对于流量的影响。
    #[derivative(Default(value = "200"))]
    pub probe_rtt_time: u32,
    /// BDP增益,见后文
    #[derivative(Default(value = "1024"))]
    pub bdp_gain: usize,
}

impl Config {
    pub fn mss(&self) -> usize {
        (self.mtu - OVERHEAD) as usize
    }
}

impl ControlBlock {
    pub fn new(conv: u32, config: Config) -> ControlBlock {
        ...
    }
}

异常类型

KCP原本的C实现仅使用负数表达异常值,虽简介但其含义并不明晰,在本实现中我们对于异常进行了清晰定义:

#[derive(Debug, Error)]
pub enum Error {
    #[error("packet to be sent too large to be fragmented")]
    OversizePacket,
    #[error("incomplete KCP packet")]
    IncompletePacket,
    #[error("invalid KCP command: {0}")]
    InvalidCommand(u8),
    #[error("empty queue (try again later)")]
    NotAvailable,
    #[error("wrong conv. (expected {expected}, found {found})")]
    WrongConv { expected: u32, found: u32 },
}

pub type Result<T> = std::result::Result<T, Error>;

以上异常类型还有更精确的空间,但是目前应该已经堪堪够用了。

发包方式

在原来的C实现在发包时直接调用callback,其优点是简洁,但其缺点在于callback的运行时间不定以及异常处理不明对运行产生的影响。何况在Rust当中安全存储callback需要和borrow checker拼命。在本实现中,我们将flush出去的包暂存在一个队列中,然后通过外部不断poll的方式拉出去。一方面,主动poll的方式和底层收到包时的push呼应;另一方面,这有助于分离底层发包和KCP逻辑本身,是“无IO/Sans IO”理念的一种体现。缺点是缓存队列可能会膨胀得厉害。当中tradeoff见仁见智。

impl ControlBlock {
    ...
    /// 底层收包push
    pub fn input(&mut self, mut data: &[u8]) -> Result<usize> { ... }
    /// 底层发包poll
    pub fn output(&mut self) -> Option<Vec<u8>> { ... }
}

去除checkupdate

这是一个比较大胆的改动,未必适合所有情形。去除的原因是在数据结构的优化下计算重传、更新发送窗口的开销大幅度减小,已经可以在每一次调用inputsend的时候进行一次,没有必要去不断checkupdate。上层只需要按照固定的时间间隔调用flush就行了。我进行这样的设计是为了简化上层的代码,而且我的应用情形恰好是高流量的反正都要一直flush,也无所谓。

flush的代码也其实很简单:

pub fn flush(&mut self) {
    self.sync_now(); // 更新now
    self.flush_probe(); // 更新窗口探测
    self.flush_push(); // 计算重传以及更新发送窗口
    self.flush_ack(); // 发ACK
    if !self.buffer.is_empty() {
        let mut new_buf = Vec::with_capacity(self.config.mtu as usize);
        std::mem::swap(&mut self.buffer, &mut new_buf);
        self.output.push_back(new_buf);
    }
}

既然sync_nowflush_pushinputsend当中都可以廉价地调用,那为什么还需要不断checkupdate呢?直接调用flush了事。

如果要参考有checkupdate的实现,可以参照早些时候的commit

窗口数据结构的改进

KCP原版的实现中发送/接收的队列/窗口全部使用队列作为数据结构,这固然使得代码变简单了,但也一定程度上降低了性能:在队列中查找KCP段最差需要线性时间,这在某些情形下未必是最优的。在本实现中,我们优化数据结构,以最优的复杂度实现发送/接受窗口需要的若干操作:

  • \mathcal{O}(1)以分段的序号为键查找分段。
  • \mathcal{O}(1)插入分段。
  • \mathcal{O}(1)以分段的序号为键查找分段。
  • \mathcal{O}(1)以分段的序号为键删除分段。
  • \mathcal{O}(1)查询/弹出最早插入的分段(在发送窗口中,最早插入的分段自然是序号最小的分段)。
  • \mathcal{O}(k)遍历以插入顺序为序,某分段的所有k个前驱(在发送窗口中,分段插入顺序即序号顺序,因此该操作可直接用于快速重传的计算)。
  • \mathcal{O}(1)查询大小。

考虑到任何时刻窗口内分段序号之差不会大于窗口大小这一常数,符合上述要求的数据结构就可以用链表+滚动数组高效实现。代码不长,百行左右:

struct Element<T> {
    /// 前驱下标
    prev: usize,
    /// 后继下标
    next: usize,
    data: T,
}

pub struct Window<T> {
    size: usize,
    entry: Vec<Option<Element<T>>>,
    end: Option<usize>,
    len: usize,
}

impl<T> Window<T> {
    pub fn with_size(size: usize) -> Self {
        Self {
            size,
            entry: (0..size).map(|_| None).collect(),
            end: None,
            len: 0,
        }
    }

    pub fn is_empty(&self) -> bool {
        self.end.is_none()
    }

    pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
        match self.entry[index % self.size].as_mut() {
            Some(elem) => Some(&mut elem.data),
            None => None,
        }
    }

    pub fn push(&mut self, index: usize, data: T) {
        let index = index % self.size;
        if self.entry[index].is_some() {
            return;
        }
        self.entry[index] = Some(match self.end {
            Some(prev) => {
                let prev_elem = self.entry[prev].as_mut().unwrap();
                let next = prev_elem.next;
                prev_elem.next = index;
                self.entry[next].as_mut().unwrap().prev = index;
                Element { prev, next, data }
            }
            None => Element { prev: index, next: index, data },
        });
        self.end = Some(index);
        self.len += 1;
    }

    pub fn remove(&mut self, index: usize) -> Option<T> {
        let index = index % self.size;
        let elem = self.entry[index].take()?;
        let (prev, next) = (elem.prev, elem.next);
        self.entry[index] = None;
        self.len -= 1;
        if index == self.end.unwrap() {
            if prev == index {
                self.end = None;
                return Some(elem.data);
            } else {
                self.end = Some(prev);
            }
        }
        self.entry[prev].as_mut().unwrap().next = next;
        self.entry[next].as_mut().unwrap().prev = prev;
        Some(elem.data)
    }

    pub fn front(&self) -> Option<&T> {
        self.end.map(|end| {
            let head = self.entry[end].as_ref().unwrap().next;
            &self.entry[head].as_ref().unwrap().data
        })
    }

    pub fn pop_unchecked(&mut self) -> T {
        let end = self.end.unwrap();
        let head = self.entry[end].as_ref().unwrap().next;
        self.remove(head).unwrap()
    }

    pub fn len(&self) -> usize {
        self.len
    }

    pub fn for_preceding(&mut self, index: usize, mut action: impl FnMut(&mut T)) {
        let mut index = index % self.size;
        index = match self.entry[index].as_ref() {
            Some(elem) => elem.prev,
            None => return,
        };
        while index != self.end.unwrap() {
            let elem = self.entry[index].as_mut().unwrap();
            action(&mut elem.data);
            index = elem.prev;
        }
    }
}

因为滚动数组是连续空间,在内存布局上相较于链表对于缓存更加友好,所以速度应该还可以再快一点。唯一的不足是unwrap有点多看着心惊肉跳,并且用指针可能会比用下标快一丁点,但是用Rust写数据结构大约就是这个尿性。

有这个打底,窗口大小开到8192实测是一点问题都没有的,更大的没试过。

但KCP的设计本质上是不适合大流量的,因为快速重传无论如何优化数据结构最坏的线性复杂度就在那里无法消除,除非可以限制快速重传向后看的范围,但后者又削弱了快速重传的意义与效用。

重传计时器的改进

原版的KCP实现在check的时候需要遍历发送窗口来确定最近的重传时间,在flush的时候又要遍历才能重传,这在窗口较大的时候显然是比较吃性能的。原作者记得在issues里的讨论中提过可以用时间轮进行优化。诚然,时间轮是最好的方案,但是实现起来较为复杂。因此,本实现使用借助Rust的标准库实现起来相对简单的小根堆进行优化:

use std::cmp::Reverse;
use std::collections::BinaryHeap;

pub struct Timer(BinaryHeap<Reverse<u64>>);

impl Timer {
    pub fn with_capacity(capacity: usize) -> Self {
        Self(BinaryHeap::with_capacity(capacity))
    }

    pub fn schedule(&mut self, ts: u32, sn: u32) {
        self.0.push(Reverse(((ts as u64) << 32) | sn as u64));
    }

    /// 获取截止到now发生的 一个 事件,应该重复调用
    pub fn event(&mut self, now: u32) -> Option<(u32, u32)> {
        let key = (now as u64 + 1) << 32;
        match self.0.peek() {
            Some(&Reverse(val)) if val < key => {
                let sn = val & (u32::max_value() as u64);
                let ts = val >> 32;
                self.0.pop();
                Some((ts as u32, sn as u32))
            }
            _ => None,
        }
    }
}

计时器只需要存时间和分段序号即可。调用的代码如下:

fn flush_push(&mut self) {
    // ... 省去流控以及把队列里的分段加入发送窗口的部分
    let mut send_buf = std::mem::take(&mut self.send_buf);
    while let Some((ts, sn)) = self.timer.event(self.now) {
        if sn < self.send_una || sn >= self.send_nxt {
            continue; // 分段被ACK于是不在发送窗口里了,自然跳过
        }
        if let Some(seg) = send_buf.get_mut(sn as usize) {
            if ts == seg.ts {
                seg.ts = self.prepare_send(seg); // 更新RTO并计算下一次重传的时间
                seg.ts_last_send = ts;
                self.dead_link |= seg.sends >= self.config.dead_link_thres;
                self.flush_segment(Command::Push, seg.frg, seg.sn, ts, seg.payload.len());
                self.buffer.extend_from_slice(&seg.payload);
                self.timer.schedule(seg.ts, seg.sn); // 安排下一次重传
            }
        }
    }
    self.send_buf = send_buf;
}

由于查看小根堆堆顶是\mathcal{O}(1)的,因此在没有重传的时候flush_push的开销确实很小。足以在inputsend时都调用一次。真的要重传时,更新小根堆的时间复杂度也是对数级别的,这就给去除checkupdate提供了基础。

BBR

最后的改进是用BBR取代了KCP原版实现中朴素的基于丢包的流控算法。

我试图参照原论文实现BBR,但因为计时精度的问题packet pacing是做不到了。实现的部分有

  • 基于单调队列的滑动窗口BtlBw max-filter。
  • 基于单调队列的滑动窗口RTprop min-filter。
  • BBR状态机。
  • 基于以上三者计算inflight limit进行流控。

和BBR有分歧的一点在于在ProbeRTT状态采用BDP的一半作为拥塞窗口而不是原文的4个包。

此外,本实现只对只传输一次的分段计算BBR的各项参数,如RTT,带宽,更新各个filter等。原因是实际上大部分的分段都在看到包头的UNA之后就被ACK掉了而不是被单独的ACK包ACK的。ACK包带有分段的序号与时间戳,所以可以清楚知道ACK的是哪一次传输,但被UNA ACK掉的就不清楚,唯一的例外是分段只被传输了一次。如果对于多次传输的包仍然直接计算BBR,那么万一ACK恰好在重传之后到达,那么误算出的RTT就非常小,导致RTprop非常小,进而BDP非常小,整个BBR就堵住了。诚然,可以把每一次传输的时间戳都存起来,然后在被UNA时选择最接近now - srtt的传输,但这就增加了代码的复杂度。考虑到丢包的毕竟是少数,如此未必会有特别大的优势(实现这个的代码在这个commit,也可作为参考)。

实测效果还行。一个比较重要的问题是在间歇性flush的情况下对于带宽的计算并不准确甚至有低估的倾向,往往导致BDP过于保守,即使ProbeBW状态有一个1.25x的激进phase也解决不了问题。我想到的一个解决方案是将拥塞窗口不是简单的设置为BDP,而是乘上一个增益,也就是配置里面的bdp_gain。为了避免浮点数运算,bdp_gain使用1024为基数。一般来说设置成1280就差不多了——其实就是主动创造轻微的拥塞来确保占有带宽。这对于其他的TCP连接固然有些不公平的,是否采用见仁见智。

真·背景

其实一开始,只是为了编译的时候能够不带着unsafe和C编译器而选择把原版实现移植到Rust,当时代码大部分几乎一模一样。

后来引入BBR魔改了一回。

为了代码更Rust魔改了一回。

再后来做配置分离魔改了一回。

再后来数据结构优化又魔改了一回。

来来回回地改,到最后除了架子还和原版实现相似,内部的代码已经大变样了。

但是在ICMP隧道上试验下来仍然不是最令人满意,CPU占用仍然不少,带宽仍然不能跑满,goodput仍然不高。

我不知道是我应用层以及底层的代码写的有问题,还是KCP本身就不是很适合高流量大窗口的应用场景。

或许二者兼有之?

前两天突然想起了QUIC,找了一下,Cloudflare有一个优秀的QUIC实现,是Rust的,而且是Sans IO的。大概QUIC才是最适合我的应用情形的吧。我准备这几天试验一下。

或许之后就转QUIC了呢?(笑)

那我魔改的KCP就放在这吃灰?

于是乎,就有了这篇文章。