完整的代码在此

背景

最近我在用 Rust 写一个 ICMP 隧道,因为 ICMP 包本身是不可靠的,于是需要在 ICMP 之上写一个可靠协议。一个完整的 TCP 协议栈显然过于臃肿了(何况也并没有现成的无 IO 的轮子),所以我就看上了 skywind3000 大佬的 KCP 协议 —— 轻量、简洁、代码连我这种网路萌新都看得懂,实在是再好不过了。

一开始是我直接使用原版的 C 实现 + FFI 封装,在不开流控的情况下效果不错,但可惜原版的拥塞控制用的是最朴素的 TCP Tahoe(作者也表明出于简洁性考虑不准备在标准实现当中使用复杂的流控算法),其在国际网络环境下的表现实在差强人意。想改进,但自己对于自己 C 的编程水平实在是不抱信心。既然 Rust 也是性能一流的系统编程语言,我最终还是决定用 Rust 再写了一个实现。本实现具有以下特点:

  • 相较于 C 实现进行了架构上的些许调整。
  • 在 C 实现的基础之上,使用链表 + 滚动数组优化大窗口下的发送性能。
  • 在 C 实现的基础之上,使用小根堆优化 RTO 计时器的效率,提升重传性能。
  • 将著名的 BBR 拥塞控制算法进行一定修改后试验性地运用到 KCP 中。

依赖的包

为了使编写更加简便,我们的实现依赖以下 Rust crates:

  • bytes—— 简便的字节处理(代替原来 C 实现当中的 encode_xxx/decode_xxx)。
  • num_enum—— 简化 Rust 枚举与字节的互相转换。
  • derivative—— 简化一些 trait 的实现。
  • thiserror—— 简化错误类型的定义。
  • rand—— 用于 BBR 随机相位初始化。

架构上的调整

常量与配置

相对于 C 实现,本实现大幅减少了常量的数量。最后仅剩的常量有五:

/// KCP 包头大小
const OVERHEAD: u32 = 24;
/// 最大分段
const MAX_FRAGMENTS: u16 = 128;
/// KCP 段类型
#[derive(Debug, Clone, Copy, TryFromPrimitive, IntoPrimitive)]
#[repr(u8)]
enum Command {
    Push = 81,
    Ack = 82,
    AskWnd = 83,
    TellWnd = 84,
}
/// BBR 各阶段的增益
const BBR_GAIN_CYCLE: [usize; 8] = [5, 3, 4, 4, 4, 4, 4, 4];
/// BDP 增益的分母,见后文
const BDP_GAIN_DEN: usize = 1024;

常量少了,变量自然就多了,原来 C 实现的常量在本实现中成为可配置项:

/// 大部分配置的意思如字面
#[derive(Clone, Debug, Deserialize, Derivative)]
#[derivative(Default)]
pub struct Config {
    #[derivative(Default(value = "536"))]
    pub mtu: u32,
    #[derivative(Default(value = "200"))]
    pub rto_default: u32,
    #[derivative(Default(value = "100"))]
    pub rto_min: u32,
    #[derivative(Default(value = "6000"))]
    pub rto_max: u32,
    #[derivative(Default(value = "7000"))]
    pub probe_min: u32,
    #[derivative(Default(value = "120000"))]
    pub probe_max: u32,
    #[derivative(Default(value = "1024"))]
    pub send_wnd: u16,
    #[derivative(Default(value = "1024"))]
    pub recv_wnd: u16,
    #[derivative(Default(value = "40"))]
    pub interval: u32,
    /// 若一个包重传 dead_link_thres 次后依然失败,则视作底层链路失效。
    #[derivative(Default(value = "20"))]
    pub dead_link_thres: u32,
    /// nodelay 模式下,rto_min = 0 且 rto 在重传失败后不指数增长。
    #[derivative(Default(value = "false"))]
    pub nodelay: bool,
    /// stream 模式下,多个数据包可以被合并在同一段内从而减少开销。
    #[derivative(Default(value = "false"))]
    pub stream: bool,
    /// 如果指定,则一个包在 fast_resend_thres 个在其之后的包 ACK 之后会直接重传
    #[derivative(Default(value = "None"))]
    pub fast_resend_thres: Option<u32>,
    /// 快速重传的次数上限
    #[derivative(Default(value = "None"))]
    pub fast_resend_limit: Option<u32>,
    /// 是否启用 BBR 控制算法
    #[derivative(Default(value = "false"))]
    pub bbr: bool,
    /// BBR 中 RTprop(往返时间)滑动窗口的时间长度(单位:毫秒)
    #[derivative(Default(value = "10000"))]
    pub rt_prop_wnd: u32,
    /// BBR 中 BtlBw(瓶颈带宽)滑动串口的长度(单位:RTT)
    #[derivative(Default(value = "10"))]
    pub btl_bw_wnd: u32,
    /// BBR 中一次 RTT/RTprop 探测的时间(单位:RTT),减少该值可以减轻 RTT 探测对于流量的影响。
    #[derivative(Default(value = "200"))]
    pub probe_rtt_time: u32,
    /// BDP 增益,见后文
    #[derivative(Default(value = "1024"))]
    pub bdp_gain: usize,
}

impl Config {
    pub fn mss(&self) -> usize {
        (self.mtu - OVERHEAD) as usize
    }
}

impl ControlBlock {
    pub fn new(conv: u32, config: Config) -> ControlBlock {
        ...
    }
}

异常类型

KCP 原本的 C 实现仅使用负数表达异常值,虽简介但其含义并不明晰,在本实现中我们对于异常进行了清晰定义:

#[derive(Debug, Error)]
pub enum Error {
    #[error("packet to be sent too large to be fragmented")]
    OversizePacket,
    #[error("incomplete KCP packet")]
    IncompletePacket,
    #[error("invalid KCP command: {0}")]
    InvalidCommand(u8),
    #[error("empty queue (try again later)")]
    NotAvailable,
    #[error("wrong conv. (expected {expected}, found {found})")]
    WrongConv { expected: u32, found: u32 },
}

pub type Result<T> = std::result::Result<T, Error>;

以上异常类型还有更精确的空间,但是目前应该已经堪堪够用了。

发包方式

在原来的 C 实现在发包时直接调用 callback,其优点是简洁,但其缺点在于 callback 的运行时间不定以及异常处理不明对运行产生的影响。何况在 Rust 当中安全存储 callback 需要和 borrow checker 拼命。在本实现中,我们将 flush 出去的包暂存在一个队列中,然后通过外部不断 poll 的方式拉出去。一方面,主动 poll 的方式和底层收到包时的 push 呼应;另一方面,这有助于分离底层发包和 KCP 逻辑本身,是 “无 IO/Sans IO” 理念的一种体现。缺点是缓存队列可能会膨胀得厉害。当中 tradeoff 见仁见智。

impl ControlBlock {
    ...
    /// 底层收包 push
    pub fn input(&mut self, mut data: &[u8]) -> Result<usize> { ... }
    /// 底层发包 poll
    pub fn output(&mut self) -> Option<Vec<u8>> { ... }
}

去除 checkupdate

这是一个比较大胆的改动,未必适合所有情形。去除的原因是在数据结构的优化下计算重传、更新发送窗口的开销大幅度减小,已经可以在每一次调用 inputsend 的时候进行一次,没有必要去不断 checkupdate。上层只需要按照固定的时间间隔调用 flush 就行了。我进行这样的设计是为了简化上层的代码,而且我的应用情形恰好是高流量的反正都要一直 flush,也无所谓。

flush 的代码也其实很简单:

pub fn flush(&mut self) {
    self.sync_now(); // 更新 now
    self.flush_probe(); // 更新窗口探测
    self.flush_push(); // 计算重传以及更新发送窗口
    self.flush_ack(); // 发 ACK
    if !self.buffer.is_empty() {
        let mut new_buf = Vec::with_capacity(self.config.mtu as usize);
        std::mem::swap(&mut self.buffer, &mut new_buf);
        self.output.push_back(new_buf);
    }
}

既然 sync_nowflush_pushinputsend 当中都可以廉价地调用,那为什么还需要不断 checkupdate 呢?直接调用 flush 了事。

如果要参考有 checkupdate 的实现,可以参照早些时候的 commit

窗口数据结构的改进

KCP 原版的实现中发送 / 接收的队列 / 窗口全部使用队列作为数据结构,这固然使得代码变简单了,但也一定程度上降低了性能:在队列中查找 KCP 段最差需要线性时间,这在某些情形下未必是最优的。在本实现中,我们优化数据结构,以最优的复杂度实现发送 / 接受窗口需要的若干操作:

  • \(\mathcal{O}(1)\) 以分段的序号为键查找分段。
  • \(\mathcal{O}(1)\) 插入分段。
  • \(\mathcal{O}(1)\) 以分段的序号为键查找分段。
  • \(\mathcal{O}(1)\) 以分段的序号为键删除分段。
  • \(\mathcal{O}(1)\) 查询 / 弹出最早插入的分段(在发送窗口中,最早插入的分段自然是序号最小的分段)。
  • \(\mathcal{O}(k)\) 遍历以插入顺序为序,某分段的所有 \(k\) 个前驱(在发送窗口中,分段插入顺序即序号顺序,因此该操作可直接用于快速重传的计算)。
  • \(\mathcal{O}(1)\) 查询大小。

考虑到任何时刻窗口内分段序号之差不会大于窗口大小这一常数,符合上述要求的数据结构就可以用链表 + 滚动数组高效实现。代码不长,百行左右:

struct Element<T> {
    /// 前驱下标
    prev: usize,
    /// 后继下标
    next: usize,
    data: T,
}

pub struct Window<T> {
    size: usize,
    entry: Vec<Option<Element<T>>>,
    end: Option<usize>,
    len: usize,
}

impl<T> Window<T> {
    pub fn with_size(size: usize) -> Self {
        Self {
            size,
            entry: (0..size).map(|_| None).collect(),
            end: None,
            len: 0,
        }
    }

    pub fn is_empty(&self) -> bool {
        self.end.is_none()
    }

    pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
        match self.entry[index % self.size].as_mut() {
            Some(elem) => Some(&mut elem.data),
            None => None,
        }
    }

    pub fn push(&mut self, index: usize, data: T) {
        let index = index % self.size;
        if self.entry[index].is_some() {
            return;
        }
        self.entry[index] = Some(match self.end {
            Some(prev) => {
                let prev_elem = self.entry[prev].as_mut().unwrap();
                let next = prev_elem.next;
                prev_elem.next = index;
                self.entry[next].as_mut().unwrap().prev = index;
                Element { prev, next, data }
            }
            None => Element { prev: index, next: index, data },
        });
        self.end = Some(index);
        self.len += 1;
    }

    pub fn remove(&mut self, index: usize) -> Option<T> {
        let index = index % self.size;
        let elem = self.entry[index].take()?;
        let (prev, next) = (elem.prev, elem.next);
        self.entry[index] = None;
        self.len -= 1;
        if index == self.end.unwrap() {
            if prev == index {
                self.end = None;
                return Some(elem.data);
            } else {
                self.end = Some(prev);
            }
        }
        self.entry[prev].as_mut().unwrap().next = next;
        self.entry[next].as_mut().unwrap().prev = prev;
        Some(elem.data)
    }

    pub fn front(&self) -> Option<&T> {
        self.end.map(|end| {
            let head = self.entry[end].as_ref().unwrap().next;
            &self.entry[head].as_ref().unwrap().data
        })
    }

    pub fn pop_unchecked(&mut self) -> T {
        let end = self.end.unwrap();
        let head = self.entry[end].as_ref().unwrap().next;
        self.remove(head).unwrap()
    }

    pub fn len(&self) -> usize {
        self.len
    }

    pub fn for_preceding(&mut self, index: usize, mut action: impl FnMut(&mut T)) {
        let mut index = index % self.size;
        index = match self.entry[index].as_ref() {
            Some(elem) => elem.prev,
            None => return,
        };
        while index != self.end.unwrap() {
            let elem = self.entry[index].as_mut().unwrap();
            action(&mut elem.data);
            index = elem.prev;
        }
    }
}

因为滚动数组是连续空间,在内存布局上相较于链表对于缓存更加友好,所以速度应该还可以再快一点。唯一的不足是 unwrap 有点多看着心惊肉跳,并且用指针可能会比用下标快一丁点,但是用 Rust 写数据结构大约就是这个尿性。

有这个打底,窗口大小开到 8192 实测是一点问题都没有的,更大的没试过。

但 KCP 的设计本质上是不适合大流量的,因为快速重传无论如何优化数据结构最坏的线性复杂度就在那里无法消除,除非可以限制快速重传向后看的范围,但后者又削弱了快速重传的意义与效用。

重传计时器的改进

原版的 KCP 实现在 check 的时候需要遍历发送窗口来确定最近的重传时间,在 flush 的时候又要遍历才能重传,这在窗口较大的时候显然是比较吃性能的。原作者记得在 issues 里的讨论中提过可以用时间轮进行优化。诚然,时间轮是最好的方案,但是实现起来较为复杂。因此,本实现使用借助 Rust 的标准库实现起来相对简单的小根堆进行优化:

use std::cmp::Reverse;
use std::collections::BinaryHeap;

pub struct Timer(BinaryHeap<Reverse<u64>>);

impl Timer {
    pub fn with_capacity(capacity: usize) -> Self {
        Self(BinaryHeap::with_capacity(capacity))
    }

    pub fn schedule(&mut self, ts: u32, sn: u32) {
        self.0.push(Reverse(((ts as u64) << 32) | sn as u64));
    }

    /// 获取截止到 now 发生的 一个 事件,应该重复调用
    pub fn event(&mut self, now: u32) -> Option<(u32, u32)> {
        let key = (now as u64 + 1) << 32;
        match self.0.peek() {
            Some(&Reverse(val)) if val < key => {
                let sn = val & (u32::max_value() as u64);
                let ts = val >> 32;
                self.0.pop();
                Some((ts as u32, sn as u32))
            }
            _ => None,
        }
    }
}

计时器只需要存时间和分段序号即可。调用的代码如下:

fn flush_push(&mut self) {
    // ... 省去流控以及把队列里的分段加入发送窗口的部分
    let mut send_buf = std::mem::take(&mut self.send_buf);
    while let Some((ts, sn)) = self.timer.event(self.now) {
        if sn < self.send_una || sn >= self.send_nxt {
            continue; // 分段被 ACK 于是不在发送窗口里了,自然跳过
        }
        if let Some(seg) = send_buf.get_mut(sn as usize) {
            if ts == seg.ts {
                seg.ts = self.prepare_send(seg); // 更新 RTO 并计算下一次重传的时间
                seg.ts_last_send = ts;
                self.dead_link |= seg.sends >= self.config.dead_link_thres;
                self.flush_segment(Command::Push, seg.frg, seg.sn, ts, seg.payload.len());
                self.buffer.extend_from_slice(&seg.payload);
                self.timer.schedule(seg.ts, seg.sn); // 安排下一次重传
            }
        }
    }
    self.send_buf = send_buf;
}

由于查看小根堆堆顶是 \(\mathcal{O}(1)\) 的,因此在没有重传的时候 flush_push 的开销确实很小。足以在 inputsend 时都调用一次。真的要重传时,更新小根堆的时间复杂度也是对数级别的,这就给去除 checkupdate 提供了基础。

BBR

最后的改进是用 BBR 取代了 KCP 原版实现中朴素的基于丢包的流控算法。

我试图参照原论文实现 BBR,但因为计时精度的问题 packet pacing 是做不到了。实现的部分有

  • 基于单调队列的滑动窗口 BtlBw max-filter。
  • 基于单调队列的滑动窗口 RTprop min-filter。
  • BBR 状态机。
  • 基于以上三者计算 inflight limit 进行流控。

和 BBR 有分歧的一点在于在 ProbeRTT 状态采用 BDP 的一半作为拥塞窗口而不是原文的 4 个包。

此外,本实现只对只传输一次的分段计算 BBR 的各项参数,如 RTT,带宽,更新各个 filter 等。原因是实际上大部分的分段都在看到包头的 UNA 之后就被 ACK 掉了而不是被单独的 ACK 包 ACK 的。ACK 包带有分段的序号与时间戳,所以可以清楚知道 ACK 的是哪一次传输,但被 UNA ACK 掉的就不清楚,唯一的例外是分段只被传输了一次。如果对于多次传输的包仍然直接计算 BBR,那么万一 ACK 恰好在重传之后到达,那么误算出的 RTT 就非常小,导致 RTprop 非常小,进而 BDP 非常小,整个 BBR 就堵住了。诚然,可以把每一次传输的时间戳都存起来,然后在被 UNA 时选择最接近 now - srtt 的传输,但这就增加了代码的复杂度。考虑到丢包的毕竟是少数,如此未必会有特别大的优势(实现这个的代码在这个 commit,也可作为参考)。

实测效果还行。一个比较重要的问题是在间歇性 flush 的情况下对于带宽的计算并不准确甚至有低估的倾向,往往导致 BDP 过于保守,即使 ProbeBW 状态有一个 1.25x 的激进 phase 也解决不了问题。我想到的一个解决方案是将拥塞窗口不是简单的设置为 BDP,而是乘上一个增益,也就是配置里面的 bdp_gain。为了避免浮点数运算,bdp_gain 使用 1024 为基数。一般来说设置成 1280 就差不多了 —— 其实就是主动创造轻微的拥塞来确保占有带宽。这对于其他的 TCP 连接固然有些不公平的,是否采用见仁见智。

真・背景

其实一开始,只是为了编译的时候能够不带着 unsafe 和 C 编译器而选择把原版实现移植到 Rust,当时代码大部分几乎一模一样。

后来引入 BBR 魔改了一回。

为了代码更 Rust 魔改了一回。

再后来做配置分离魔改了一回。

再后来数据结构优化又魔改了一回。

来来回回地改,到最后除了架子还和原版实现相似,内部的代码已经大变样了。

但是在 ICMP 隧道上试验下来仍然不是最令人满意,CPU 占用仍然不少,带宽仍然不能跑满,goodput 仍然不高。

我不知道是我应用层以及底层的代码写的有问题,还是 KCP 本身就不是很适合高流量大窗口的应用场景。

或许二者兼有之?

前两天突然想起了 QUIC,找了一下,Cloudflare 有一个优秀的 QUIC 实现,是 Rust 的,而且是 Sans IO 的。大概 QUIC 才是最适合我的应用情形的吧。我准备这几天试验一下。

或许之后就转 QUIC 了呢?(笑)

那我魔改的 KCP 就放在这吃灰?

于是乎,就有了这篇文章。