P4 10-Congestion Aware Load Balancing

P4示例程序-10 负载均衡与带内遥测

  • 功能

    基于ECMP的智能负载均衡和自适应拥塞探测方案:
  1. 基础功能:基于ECMP实现负载均衡;
  2. 带内遥测:收集队列的排队信息,将信息封装在数据包里传递,并返回回馈包;
  3. 动态绕路:当检测到路径上出现严重拥塞,就通知上图中的S1改变哈希的种子值,将流量通过其它路径传输。

流程如下: 1. egress阶段检测到出端口的排队深度太大:克隆出一个反馈包,并recirculate到ingress阶段; 2. ingress接收到反馈包:反馈包作为正常数据包进行转发; 3. 首个交换机(例如上图中的交换机S1)接收到反馈包:调整ECMP哈希函数的种子值。

  • 拓扑结构

拓扑结构
  • 代码

  1. p4app.json
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    {
    "p4_src": "p4src/loadbalancer.p4",
    "cli": true,
    "pcap_dump": true,
    "enable_log": true,
    "exec_scripts": [
    {
    "cmd": "python routing-controller.py",
    "reboot_run": true
    }
    ],
    "topology": {
    "assignment_strategy": "l3",
    "default":
    {
    "bw": 10
    },
    "links": [
    ["h1", "s1"], ["h2", "s1"], ["h3", "s4"], ["h4", "s4"], ["s1", "s2"], ["s1", "s3"] , ["s2", "s4"], ["s3", "s4"]],
    "hosts": {
    "h1": {
    },
    "h2": {
    },
    "h3": {
    },
    "h4": {
    }
    },
    "switches": {
    "s1": {
    },
    "s2": {
    },
    "s3": {
    },
    "s4": {
    }
    }
    }
    }
  2. Headers
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    const bit<16> TYPE_IPV4 = 0x800; // 定义以太网类型字段,IPv4协议的值
    const bit<16> TYPE_TELEMETRY = 0x7777; // 自定义遥测报文类型标识
    const bit<16> TYPE_FEEDBACK = 0x7778; // 自定义反馈报文类型标识

    const bit<4> TYPE_EGRESS_HOST = 1; // 出口为主机类型标识
    const bit<4> TYPE_EGRESS_SWITCH = 2; // 出口为交换机类型标识

    typedef bit<9> egressSpec_t; // 定义9位宽的出口端口类型
    typedef bit<48> macAddr_t; // 定义48位宽的MAC地址类型
    typedef bit<32> ip4Addr_t; // 定义32位宽的IPv4地址类型

    header ethernet_t { // 定义以太网头部结构
    macAddr_t dstAddr; // 目的MAC地址
    macAddr_t srcAddr; // 源MAC地址
    bit<16> etherType; // 以太网类型字段,标识下一层协议
    }

    header telemetry_t { // 定义遥测头部结构
    bit<16> enq_qdepth; // 入队列深度
    bit<16> nextHeaderType; // 下一个协议类型字段
    }

    header ipv4_t { // 定义IPv4头部结构
    bit<4> version; // IP版本号,通常为4
    bit<4> ihl; // 首部长度
    bit<8> tos; // 服务类型(类型服务字段)
    bit<16> totalLen; // 总长度
    bit<16> identification; // 标识符
    bit<3> flags; // 标志位
    bit<13> fragOffset; // 分片偏移
    bit<8> ttl; // 生存时间
    bit<8> protocol; // 上层协议号
    bit<16> hdrChecksum; // 头部校验和
    ip4Addr_t srcAddr; // 源IP地址
    ip4Addr_t dstAddr; // 目的IP地址
    }

    header tcp_t{ // 定义TCP头部结构
    bit<16> srcPort; // 源端口号
    bit<16> dstPort; // 目的端口号
    bit<32> seqNo; // 序列号
    bit<32> ackNo; // 确认号
    bit<4> dataOffset; // 数据偏移,TCP头部长度
    bit<4> res; // 保留字段
    bit<1> cwr; // 拥塞窗口减少标志
    bit<1> ece; // ECN回显标志
    bit<1> urg; // 紧急指针有效标志
    bit<1> ack; // 确认号有效标志
    bit<1> psh; // 推送功能标志
    bit<1> rst; // 连接重置标志
    bit<1> syn; // 同步序列号标志
    bit<1> fin; // 结束标志
    bit<16> window; // 窗口大小
    bit<16> checksum; // 校验和
    bit<16> urgentPtr; // 紧急指针
    }

    struct feedback_t { // 反馈结构体定义(当前为空)
    }

    struct metadata { // 定义元数据结构
    bit<14> ecmp_hash; // ECMP哈希值
    bit<14> ecmp_group_id; // ECMP组ID
    bit<4> egress_type; // 出口类型
    bit<48> feedback_ts; // 反馈时间戳
    bit<12> feedback_register_index; // 反馈寄存器索引
    @field_list(0) // 反馈字段列表,索引为0(注解)
    feedback_t feedback; // 反馈信息结构体
    }

    struct headers { // 定义报文头集合结构
    ethernet_t ethernet; // 以太网头
    telemetry_t telemetry; // 遥测头
    ipv4_t ipv4; // IPv4头
    tcp_t tcp; // TCP头
    }

  3. Parser
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    parser MyParser(packet_in packet, // 定义解析器,输入为packet
    out headers hdr, // 输出解析出的headers
    inout metadata meta, // 传入并修改的元数据
    inout standard_metadata_t standard_metadata) { // 标准元数据输入输出

    state start { // 解析起始状态
    transition parse_ethernet; // 转到解析以太网头
    }

    state parse_ethernet { // 解析以太网头状态
    packet.extract(hdr.ethernet); // 从包中提取以太网头
    transition select(hdr.ethernet.etherType){ // 根据以太网类型字段选择下一状态
    TYPE_IPV4: parse_ipv4; // 如果是IPv4,进入IPv4解析状态
    TYPE_TELEMETRY: parse_telemetry; // 如果是遥测报文,进入遥测解析状态
    TYPE_FEEDBACK: parse_ipv4; // 反馈类型当作IPv4解析
    default: accept; // 其它类型直接接受,停止解析
    }
    }

    state parse_telemetry { // 解析遥测头状态
    packet.extract(hdr.telemetry); // 提取遥测头
    transition select(hdr.telemetry.nextHeaderType){ // 根据遥测头中下一协议字段选择
    TYPE_IPV4: parse_ipv4; // 如果是IPv4,继续解析IPv4头
    default: accept; // 否则结束解析
    }
    }

    state parse_ipv4 { // 解析IPv4头状态
    packet.extract(hdr.ipv4); // 提取IPv4头
    transition select(hdr.ipv4.protocol){ // 根据IPv4中的协议字段选择
    6 : parse_tcp; // 协议号6为TCP,继续解析TCP头
    default: accept; // 其他协议直接结束解析
    }
    }

    state parse_tcp { // 解析TCP头状态
    packet.extract(hdr.tcp); // 提取TCP头
    transition accept; // 解析结束,接受包
    }
    }
  4. Checksum Verification
    1
    2
    3
    4
    control MyVerifyChecksum(inout headers hdr, inout metadata meta) { // 校验和验证控制块,输入输出头部和元数据
    apply {
    }
    }
  5. Ingress Processing
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    control MyIngress(inout headers hdr, // ingress处理控制块,输入输出头部
    inout metadata meta, // 输入输出元数据
    inout standard_metadata_t standard_metadata) { // 标准元数据输入输出

    register <bit<REGISTER_WIDTH>>(REGISTER_SIZE) loadbalance_seed; // 定义负载均衡种子寄存器,宽度32位,大小1024

    action drop() { // 丢弃包动作
    mark_to_drop(standard_metadata); // 标记包为丢弃
    }

    action set_egress_type (bit<4> egress_type){ // 设置出口类型动作,传入4位类型码
    meta.egress_type = egress_type; // 将传入值写入元数据egress_type
    }

    table egress_type { // 定义出口类型匹配表
    key = {
    standard_metadata.egress_spec: exact; // 精确匹配出口端口号
    }

    actions = {
    set_egress_type; // 设置出口类型动作
    NoAction; // 不执行任何动作
    }
    size=64; // 表大小为64条
    default_action = NoAction; // 默认不执行任何动作
    }

    action update_flow_seed(){ // 更新流种子动作
    bit<12> register_index; // 12位寄存器索引变量
    bit<32> seed; // 32位随机种子变量
    random(seed, (bit<32>)0, (bit<32>)1234567); // 生成0到1234567间随机数写入seed

    hash(register_index, // 计算寄存器索引hash值
    HashAlgorithm.crc16, // 采用CRC16算法
    (bit<1>)0, // 初始种子0
    { hdr.ipv4.dstAddr, // hash输入字段:目的IP
    hdr.ipv4.srcAddr, // 源IP
    hdr.tcp.srcPort, // TCP源端口
    hdr.tcp.dstPort, // TCP目的端口
    hdr.ipv4.protocol}, // IP协议号
    (bit<12>)REGISTER_SIZE); // 输出范围为REGISTER_SIZE(1024)

    loadbalance_seed.write((bit<32>)register_index, seed); // 将随机种子写入负载均衡寄存器对应位置
    }

    action ecmp_group(bit<14> ecmp_group_id, bit<16> num_nhops){ // ECMP分组动作,传入组ID和下一跳数

    bit<12> register_index; // 12位寄存器索引变量
    bit<32> seed; // 32位随机种子变量

    hash(register_index, // 计算寄存器索引hash值
    HashAlgorithm.crc16, // CRC16算法
    (bit<1>)0, // 初始种子0
    { hdr.ipv4.srcAddr, // hash输入字段:源IP
    hdr.ipv4.dstAddr, // 目的IP
    hdr.tcp.srcPort, // TCP源端口
    hdr.tcp.dstPort, // TCP目的端口
    hdr.ipv4.protocol}, // IP协议号
    (bit<12>)REGISTER_SIZE); // 输出范围1024

    loadbalance_seed.read(seed, (bit<32>)register_index); // 从寄存器读取对应随机种子

    hash(meta.ecmp_hash, // 计算最终ECMP哈希值
    HashAlgorithm.crc16, // CRC16算法
    (bit<1>)0, // 初始种子0
    { hdr.ipv4.srcAddr, // hash输入字段:源IP
    hdr.ipv4.dstAddr, // 目的IP
    hdr.tcp.srcPort, // TCP源端口
    hdr.tcp.dstPort, // TCP目的端口
    hdr.ipv4.protocol, // IP协议号
    seed}, // 加入负载均衡随机种子增强hash随机性
    num_nhops); // hash结果模num_nhops作为选择
    meta.ecmp_group_id = ecmp_group_id; // 设置元数据ECMP组ID
    }

    action set_nhop(macAddr_t dstAddr, egressSpec_t port) { // 设置下一跳动作,传入下一跳MAC和出口端口

    //set the src mac address as the previous dst, this is not correct right?
    hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 将以太网源MAC设置为当前目的MAC(注释质疑正确性)

    //set the destination mac address that we got from the match in the table
    hdr.ethernet.dstAddr = dstAddr; // 设置目的MAC地址为查表得到的下一跳MAC

    //set the output port that we also get from the table
    standard_metadata.egress_spec = port; // 设置出口端口为查表得到的端口

    //decrease ttl by 1
    hdr.ipv4.ttl = hdr.ipv4.ttl - 1; // IPv4生存时间减1,防止环路
    }

    table ecmp_group_to_nhop { // ECMP组到下一跳映射表
    key = {
    meta.ecmp_group_id: exact; // 精确匹配ECMP组ID
    meta.ecmp_hash: exact; // 精确匹配ECMP哈希值
    }
    actions = {
    drop; // 丢弃包动作
    set_nhop; // 设置下一跳动作
    }
    size = 1024; // 表大小1024
    }

    table ipv4_lpm { // IPv4最长前缀匹配表,用于路由查找
    key = {
    hdr.ipv4.dstAddr: lpm; // 以目的IP做最长前缀匹配
    }
    actions = {
    set_nhop; // 设置下一跳动作
    ecmp_group; // ECMP分组动作
    drop; // 丢弃动作
    }
    size = 1024; // 表大小1024
    default_action = drop; // 默认丢弃
    }

    apply { // 主体处理逻辑

    if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_INGRESS_RECIRC){ // 如果是ingress重循环包
    bit<32> src_ip = hdr.ipv4.srcAddr; // 保存源IP
    hdr.ipv4.srcAddr = hdr.ipv4.dstAddr; // 交换源和目的IP
    hdr.ipv4.dstAddr = src_ip;
    hdr.ethernet.etherType = TYPE_FEEDBACK; // 以太网类型改为反馈类型
    }

    //Only forward packets if they are IP and TTL > 1
    if (hdr.ipv4.isValid() && hdr.ipv4.ttl > 1){ // 仅处理有效IPv4包且TTL大于1的包
    switch (ipv4_lpm.apply().action_run){ // 应用IPv4最长前缀匹配表
    ecmp_group: { // 如果执行动作为ECMP分组
    ecmp_group_to_nhop.apply(); // 执行ECMP组到下一跳表
    }
    }
    }

    egress_type.apply(); // 应用出口类型匹配表

    if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_NORMAL && hdr.ethernet.etherType == TYPE_FEEDBACK && meta.egress_type == TYPE_EGRESS_HOST){
    // 如果是普通包,类型是反馈,且出口为主机类型
    update_flow_seed(); // 更新流负载均衡种子
    drop(); // 丢弃该包
    }

    }
    }

  6. Egress Processing
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    control MyEgress(inout headers hdr, // 出口处理控制块
    inout metadata meta, // 元数据
    inout standard_metadata_t standard_metadata) { // 标准元数据

    register <bit<48>>(REGISTER_SIZE) feedback_ts; // 反馈时间戳寄存器,宽度48位,大小1024

    action read_feedback_ts(){ // 读取反馈时间戳动作

    hash(meta.feedback_register_index, // 计算反馈寄存器索引hash值
    HashAlgorithm.crc16,
    (bit<1>)0,
    { hdr.ipv4.srcAddr,
    hdr.ipv4.dstAddr,
    hdr.tcp.srcPort,
    hdr.tcp.dstPort,
    hdr.ipv4.protocol},
    (bit<12>)REGISTER_SIZE);

    feedback_ts.read(meta.feedback_ts, (bit<32>)meta.feedback_register_index); // 从寄存器读取时间戳
    }

    apply { // 出口处理主逻辑
    //Cloned packet, used to generate probe
    if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_EGRESS_CLONE){ // 如果是egress克隆包
    recirculate_preserving_field_list(0); // 重新循环报文,保持字段列表0的内容
    }

    else if (standard_metadata.instance_type == PKT_INSTANCE_TYPE_NORMAL && hdr.ethernet.etherType != TYPE_FEEDBACK) {
    // 普通包且不是反馈包时处理

    if (hdr.tcp.isValid()){ // TCP头有效
    if (hdr.telemetry.isValid()){ // 遥测头有效
    if (hdr.telemetry.enq_qdepth < (bit<16>)standard_metadata.enq_qdepth && meta.egress_type == TYPE_EGRESS_SWITCH){
    // 如果当前遥测队列深度小于当前排队深度,且出口为交换机
    hdr.telemetry.enq_qdepth = (bit<16>)standard_metadata.enq_qdepth; // 更新遥测队列深度
    }
    //If egresss. We do not update the queue of the last hop because this can not be changed anyways.
    else if (meta.egress_type == TYPE_EGRESS_HOST){
    // 出口为主机时,不更新最后一跳队列深度
    hdr.ethernet.etherType = TYPE_IPV4; // 恢复以太网类型为IPv4
    hdr.telemetry.setInvalid(); // 标记遥测头无效

    //clone packet if above threshold
    if (hdr.telemetry.enq_qdepth > 50){ // 队列深度超过阈值时克隆探针包
    read_feedback_ts(); // 读取反馈时间戳
    bit<48> backoff;
    random(backoff, 48w500000, 48w1000000); // 生成随机后备时间,500ms到1s
    if ((standard_metadata.ingress_global_timestamp - meta.feedback_ts) > backoff){
    feedback_ts.write((bit<32>)meta.feedback_register_index, standard_metadata.ingress_global_timestamp);
    bit<8> probability;
    random(probability, 8w0, 8w3); // 生成0~3的随机数
    if (probability == 0) { // 25%概率执行克隆
    clone(CloneType.E2E, 100); // 克隆包,优先级100
    }
    }
    }
    }
    }
    else {
    //If ingress and next hop is a switch
    if (meta.egress_type == TYPE_EGRESS_SWITCH){ // 遥测头无效,且出口是交换机时
    hdr.telemetry.setValid(); // 标记遥测头有效
    hdr.telemetry.enq_qdepth = (bit<16>)standard_metadata.enq_qdepth; // 设置遥测队列深度
    hdr.ethernet.etherType = TYPE_TELEMETRY; // 以太网类型设置为遥测类型
    hdr.telemetry.nextHeaderType = TYPE_IPV4; // 遥测头指明下一层为IPv4
    }
    }
    }
    }
    }
    }
  7. Checksum Computation
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    control MyComputeChecksum(inout headers hdr, inout metadata meta) { // 校验和计算控制块
    apply {
    update_checksum(
    hdr.ipv4.isValid(), // IPv4头有效才计算
    { hdr.ipv4.version, // 计算校验和的字段列表,包含IPv4头部重要字段
    hdr.ipv4.ihl,
    hdr.ipv4.tos,
    hdr.ipv4.totalLen,
    hdr.ipv4.identification,
    hdr.ipv4.flags,
    hdr.ipv4.fragOffset,
    hdr.ipv4.ttl,
    hdr.ipv4.protocol,
    hdr.ipv4.srcAddr,
    hdr.ipv4.dstAddr},
    hdr.ipv4.hdrChecksum, // 计算结果写入头部校验和字段
    HashAlgorithm.csum16); // 采用16位校验和算法

    }
    }
  8. DE parser
    1
    2
    3
    4
    5
    6
    7
    8
    9
    control MyDeparser(packet_out packet, in headers hdr) { // 出口解解析器,将头部重新打包
    apply {

    packet.emit(hdr.ethernet); // 发送以太网头
    packet.emit(hdr.telemetry); // 发送遥测头
    packet.emit(hdr.ipv4); // 发送IPv4头
    packet.emit(hdr.tcp); // 发送TCP头
    }
    }
  9. Switch
    1
    2
    3
    4
    5
    6
    7
    8
    V1Switch( 
    MyParser(),
    MyVerifyChecksum(),
    MyIngress(),
    MyEgress(),
    MyComputeChecksum(),
    MyDeparser()
    ) main;
  10. routing-controller.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    from p4utils.utils.helper import load_topo  # 导入加载拓扑的函数
    from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI # 导入简单交换机Thrift API类

    class RoutingController(object): # 定义路由控制器类

    def __init__(self): # 构造函数初始化

    self.topo = load_topo('topology.json') # 加载网络拓扑文件
    self.controllers = {} # 初始化一个字典保存交换机控制器对象
    self.init() # 调用初始化方法

    def init(self): # 初始化流程
    self.connect_to_switches() # 连接所有交换机的控制接口
    self.reset_states() # 重置所有交换机状态
    self.set_table_defaults() # 设置表的默认行为

    def reset_states(self): # 重置状态方法
    [controller.reset_state() for controller in self.controllers.values()] # 遍历所有控制器调用reset_state

    def connect_to_switches(self): # 连接交换机控制接口
    for p4switch in self.topo.get_p4switches(): # 遍历拓扑中所有P4交换机
    thrift_port = self.topo.get_thrift_port(p4switch) # 获取交换机对应的thrift端口号
    self.controllers[p4switch] = SimpleSwitchThriftAPI(thrift_port) # 用thrift端口初始化控制器对象并保存

    def set_table_defaults(self): # 设置所有交换机表的默认动作
    for controller in self.controllers.values(): # 遍历所有控制器
    controller.table_set_default("ipv4_lpm", "drop", []) # ipv4_lpm表默认动作设为丢弃
    controller.table_set_default("ecmp_group_to_nhop", "drop", []) # ecmp_group_to_nhop表默认动作设为丢弃

    def add_mirroring_ids(self): # 为所有交换机添加镜像ID

    for sw_name, controller in self.controllers.items(): # 遍历所有交换机控制器
    controller.mirroring_add(100, 1) # 添加镜像ID为100,端口1

    def set_egress_type_table(self): # 设置出口类型表

    for sw_name, controller in self.controllers.items(): # 遍历所有交换机
    for intf, node in self.topo.get_interfaces_to_node(sw_name).items(): # 遍历交换机接口对应的节点
    port_number = self.topo.interface_to_port(sw_name, intf) # 获取接口对应端口号

    if self.topo.isHost(node): # 如果节点是主机
    node_type_num = 1 # 类型编码为1
    elif self.topo.isP4Switch(node): # 如果节点是P4交换机
    node_type_num = 2 # 类型编码为2

    print("table_add at {}:".format(sw_name)) # 打印操作信息
    self.controllers[sw_name].table_add("egress_type", "set_egress_type", [str(port_number)], [str(node_type_num)])
    # 添加egress_type表项,匹配端口号,设置对应出口类型

    def route(self): # 路由表配置函数

    switch_ecmp_groups = {sw_name:{} for sw_name in self.topo.get_p4switches().keys()}
    # 初始化每台交换机的ECMP组字典,key为交换机名,值为空字典

    for sw_name, controller in self.controllers.items(): # 遍历所有交换机控制器
    for sw_dst in self.topo.get_p4switches(): # 遍历所有目的交换机

    #if its ourselves we create direct connections
    if sw_name == sw_dst: # 如果源交换机和目的交换机是同一个(直连情况)
    for host in self.topo.get_hosts_connected_to(sw_name): # 遍历连接到该交换机的主机
    sw_port = self.topo.node_to_node_port_num(sw_name, host) # 获取交换机到主机的端口号
    host_ip = self.topo.get_host_ip(host) + "/32" # 获取主机IP,设置子网掩码为32位
    host_mac = self.topo.get_host_mac(host) # 获取主机MAC地址

    #add rule
    print("table_add at {}:".format(sw_name)) # 打印添加表项信息
    self.controllers[sw_name].table_add("ipv4_lpm", "set_nhop", [str(host_ip)], [str(host_mac), str(sw_port)])
    # 在ipv4_lpm表添加直连主机下一跳条目

    #check if there are directly connected hosts
    else: # 如果目的交换机和源交换机不同(多跳情况)
    if self.topo.get_hosts_connected_to(sw_dst): # 如果目的交换机有连接主机
    paths = self.topo.get_shortest_paths_between_nodes(sw_name, sw_dst) # 获取源交换机到目的交换机所有最短路径
    for host in self.topo.get_hosts_connected_to(sw_dst): # 遍历目的交换机连接的主机

    if len(paths) == 1: # 如果只有一条最短路径
    next_hop = paths[0][1] # 下一跳节点为路径上的第二个节点

    host_ip = self.topo.get_host_ip(host) + "/24" # 获取主机IP,子网掩码24位
    sw_port = self.topo.node_to_node_port_num(sw_name, next_hop) # 获取源交换机到下一跳端口号
    dst_sw_mac = self.topo.node_to_node_mac(next_hop, sw_name) # 获取下一跳到源交换机的MAC地址

    #add rule
    print("table_add at {}:".format(sw_name)) # 打印信息
    self.controllers[sw_name].table_add("ipv4_lpm", "set_nhop", [str(host_ip)],
    [str(dst_sw_mac), str(sw_port)])
    # 添加下一跳规则

    elif len(paths) > 1: # 多条最短路径(ECMP)
    next_hops = [x[1] for x in paths] # 提取所有下一跳节点列表
    dst_macs_ports = [(self.topo.node_to_node_mac(next_hop, sw_name),
    self.topo.node_to_node_port_num(sw_name, next_hop))
    for next_hop in next_hops] # 获取每个下一跳的MAC和端口号
    host_ip = self.topo.get_host_ip(host) + "/24" # 主机IP/24掩码

    #check if the ecmp group already exists. The ecmp group is defined by the number of next
    #ports used, thus we can use dst_macs_ports as key
    if switch_ecmp_groups[sw_name].get(tuple(dst_macs_ports), None): # 判断该ECMP组是否已存在
    ecmp_group_id = switch_ecmp_groups[sw_name].get(tuple(dst_macs_ports), None) # 取出组ID
    print("table_add at {}:".format(sw_name)) # 打印信息
    self.controllers[sw_name].table_add("ipv4_lpm", "ecmp_group", [str(host_ip)],
    [str(ecmp_group_id), str(len(dst_macs_ports))])
    # 添加ECMP分组路由规则

    #new ecmp group for this switch
    else: # 新的ECMP组
    new_ecmp_group_id = len(switch_ecmp_groups[sw_name]) + 1 # 新组ID为当前组数加1
    switch_ecmp_groups[sw_name][tuple(dst_macs_ports)] = new_ecmp_group_id # 记录新组

    #add group
    for i, (mac, port) in enumerate(dst_macs_ports): # 遍历每个下一跳MAC和端口
    print("table_add at {}:".format(sw_name)) # 打印信息
    self.controllers[sw_name].table_add("ecmp_group_to_nhop", "set_nhop",
    [str(new_ecmp_group_id), str(i)],
    [str(mac), str(port)])
    # 添加ecmp_group_to_nhop表,映射组和端口索引到具体下一跳MAC端口

    #add forwarding rule
    print("table_add at {}:".format(sw_name)) # 打印信息
    self.controllers[sw_name].table_add("ipv4_lpm", "ecmp_group", [str(host_ip)],
    [str(new_ecmp_group_id), str(len(dst_macs_ports))])
    # 添加最终转发规则,指定ECMP组和下一跳数


    def main(self): # 主函数入口
    self.set_egress_type_table() # 设置出口类型表
    self.add_mirroring_ids() # 添加镜像ID
    self.route() # 执行路由配置


    if __name__ == "__main__": # 程序入口判断
    controller = RoutingController().main() # 创建控制器实例并调用主函数

  11. send.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    #!/usr/bin/env python3
    import argparse
    import sys
    import socket
    import random
    import struct

    from scapy.all import sendp, get_if_list, get_if_hwaddr
    from scapy.all import Ether, IP, UDP, TCP
    import time

    def get_if():
    ifs=get_if_list()
    iface=None # "h1-eth0"
    for i in get_if_list():
    if "eth0" in i:
    iface=i
    break
    if not iface:
    print("Cannot find eth0 interface")
    exit(1)
    return iface

    def main():

    if len(sys.argv)<3:
    print('pass 2 arguments: <destination> <number_of_random_packets>')
    exit(1)

    addr = socket.gethostbyname(sys.argv[1])
    iface = get_if()

    print("sending on interface %s to %s" % (iface, str(addr)))

    for _ in range(int(sys.argv[2])):
    pkt = Ether(src=get_if_hwaddr(iface), dst='ff:ff:ff:ff:ff:ff')
    pkt = pkt /IP(dst=addr) / TCP(dport=7777, sport=random.randint(2000,65535))
    sendp(pkt, iface=iface, verbose=False)
    time.sleep(0.1)

    if __name__ == '__main__':
    main()
  12. receive.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    #!/usr/bin/env python3
    import sys
    import os

    from scapy.all import sniff, get_if_list, Ether, get_if_hwaddr, IP, Raw, Packet, BitField, bind_layers

    def get_if():
    iface=None
    for i in get_if_list():
    if "eth0" in i:
    iface=i
    break
    if not iface:
    print("Cannot find eth0 interface")
    exit(1)
    return iface

    class Telemetry(Packet):
    fields_desc = [ BitField("enq_depth", 0, 16),
    #BitField("deq_depth", 0, 16),
    BitField("nextHeaderType", 0, 16)]

    def isNotOutgoing(my_mac):
    my_mac = my_mac
    def _isNotOutgoing(pkt):
    return pkt[Ether].src != my_mac

    return _isNotOutgoing

    def handle_pkt(pkt):

    ether = pkt.getlayer(Ether)

    telemetry = pkt.getlayer(Telemetry)
    print("Queue Info:")
    print("enq_depth", telemetry.enq_depth)
    #print "deq_depth", telemetry.deq_depth
    print()

    bind_layers(Ether, Telemetry, type=0x7777)


    def main():
    ifaces = [i for i in os.listdir('/sys/class/net/') if 'eth' in i]
    iface = ifaces[0]
    print("sniffing on %s" % iface)
    sys.stdout.flush()

    my_filter = isNotOutgoing(get_if_hwaddr(get_if()))

    sniff(filter="ether proto 0x7777", iface = iface,
    prn = lambda x: handle_pkt(x), lfilter=my_filter)

    if __name__ == '__main__':
    main()
  13. send_traffic.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    mport sys
    import random
    import time
    from p4utils.utils.helper import load_topo
    from subprocess import Popen

    topo = load_topo('topology.json')

    iperf_send = "mx {0} iperf3 -c {1} -M 9000 -t {2} --bind {3} --cport {4} -p {5} 2>&1 >/dev/null"
    iperf_recv = "mx {0} iperf3 -s -p {1} --one-off 2>&1 >/dev/null"

    Popen("sudo killall iperf iperf3", shell=True)

    dst_port1 = random.randint(1024, 65000)
    dst_port2 = random.randint(1024, 65000)

    Popen(iperf_recv.format("h3", dst_port1), shell=True)
    Popen(iperf_recv.format("h4", dst_port2), shell=True)

    time.sleep(1)

    import sys
    duration = int(sys.argv[1])

    Popen(iperf_send.format("h1", topo.get_host_ip("h3"), duration, topo.get_host_ip("h1"), dst_port1, dst_port1), shell=True)
    Popen(iperf_send.format("h2", topo.get_host_ip("h4"), duration, topo.get_host_ip("h2"), dst_port2, dst_port2), shell=True)
  14. network.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    from p4utils.mininetlib.network_API import NetworkAPI

    net = NetworkAPI()

    # Network general options
    net.setLogLevel('info')
    net.execScript('python routing-controller.py', reboot=True)

    # Network definition
    net.addP4Switch('s1')
    net.addP4Switch('s2')
    net.addP4Switch('s3')
    net.addP4Switch('s4')
    net.setP4SourceAll('p4src/loadbalancer.p4')

    net.addHost('h1')
    net.addHost('h2')
    net.addHost('h3')
    net.addHost('h4')

    net.addLink("h1", "s1")
    net.addLink("h2", "s1")
    net.addLink("h3", "s4")
    net.addLink("h4", "s4")
    net.addLink("s1", "s2")
    net.addLink("s2", "s3")
    net.addLink("s2", "s4")
    net.addLink("s3", "s4")
    net.setBwAll(10)

    # Assignment strategy
    net.l3()

    # Nodes general options
    net.enablePcapDumpAll()
    net.enableLogAll()
    net.enableCli()
    net.startNetwork()
  15. get_switch_interfaces.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    from p4utils.utils.helper import load_topo

    topo = load_topo('topology.json')

    for host in sorted(topo.get_hosts().keys(), key = lambda x: int(x[1:])):

    host_intf = topo.get_host_first_interface(host)
    sw = topo.interface_to_node(host, host_intf)
    print(host, topo.get_intfs()[sw][host]['intfName'])

P4 10-Congestion Aware Load Balancing
http://example.com/2025/08/12/P4 10-Congestion Aware Load Balancing/
作者
Wsdbybyd
发布于
2025年8月12日
许可协议