P4 07-RSVP

P4示例程序-07 RSVP

  • 功能

    使用Meter表实现带宽限速。Meter本质就是一个流量计量器,它支持实时统计通过的流量,并根据配置的速率限制流量。底层实现一般是用令牌桶或者类似算法,令牌桶维护两个计数器:桶容量代表允许突发的最大流量、令牌生成速率用于控制流量的平均速率。在交换机中的逻辑实现可采用如下配置,当有数据包到达时:
  1. 桶里有足够令牌时:包被标记为绿色(正常,合规流量),消耗对应令牌数;
  2. 桶里令牌不足但尚未完全溢出时:包被标记为黄色(轻微超出限速,可以容忍);
  3. 令牌完全用完时:包被标记为红色(超出限速,丢弃或降级处理)。
  • 拓扑结构

拓扑结构
  • 代码

  1. p4app.json
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    "topology": {
    "assignment_strategy": "l3",
    "default": {
    "bw": 10
    },
    "links": [["h1", "s1"],
    ["h2", "s1"],
    ["s1", "s2"],
    ["s1", "s3"],
    ["s2", "s4"],
    ["s3", "s4"],
    ["s2", "s5"],
    ["s3", "s6"],
    ["s4", "s5"],
    ["s4", "s6"],
    ["s5", "s7"],
    ["s6", "s7"],
    ["s7", "h5"],
    ["s7", "h6"],
    ["h3", "s3"],
    ["h4", "s5"]
    ],
    "hosts": {
    "h1": {
    },
    "h2": {
    },
    "h3": {
    },
    "h4": {
    },
    "h5": {
    },
    "h6": {
    }
    },
    "switches": {
    "s1": {
    },
    "s2": {
    },
    "s3": {
    },
    "s4": {
    },
    "s5": {
    },
    "s6": {
    },
    "s7": {
    }
    }
    }
  2. Headers
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    typedef bit<9>  egressSpec_t;        // 9位出口端口号
    typedef bit<48> macAddr_t; // 48位 MAC 地址类型
    typedef bit<32> ip4Addr_t; // 32位 IPv4 地址类型
    typedef bit<20> label_t; // 20位 MPLS 标签类型

    header ethernet_t {
    macAddr_t dstAddr; // 目的 MAC 地址
    macAddr_t srcAddr; // 源 MAC 地址
    bit<16> etherType; // 以太网类型字段(IPv4/MPLS)
    }

    header mpls_t {
    bit<20> label; // MPLS 标签值
    bit<3> exp; // 实验字段(一般用于 QoS)
    bit<1> s; // Bottom of Stack 标志位
    bit<8> ttl; // Time To Live
    }

    header ipv4_t {
    bit<4> version; // IP 版本
    bit<4> ihl; // 头部长度
    bit<8> diffserv; // 服务类型字段
    bit<16> totalLen; // 总长度
    bit<16> identification; // 标识字段
    bit<3> flags; // 标志位
    bit<13> fragOffset; // 分段偏移量
    bit<8> ttl; // 生存时间
    bit<8> protocol; // 上层协议
    bit<16> hdrChecksum; // IP 头校验和
    ip4Addr_t srcAddr; // 源 IP 地址
    ip4Addr_t dstAddr; // 目的 IP 地址
    }

    struct metadata {
    bit<2> meter_color; // 速率限制颜色:0绿、1黄、2红(rsvp meter 读入)
    }

    struct headers {
    ethernet_t ethernet; // 以太网头
    mpls_t[CONST_MAX_MPLS_HOPS] mpls; // MPLS 标签栈(最多8个)
    ipv4_t ipv4; // IPv4 头
    }
  3. Parser
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32

    parser MyParser(packet_in packet,
    out headers hdr,
    inout metadata meta,
    inout standard_metadata_t standard_metadata) {

    state start {
    transition parse_ethernet; // 起始状态跳转到以太网解析
    }

    state parse_ethernet {
    packet.extract(hdr.ethernet); // 提取以太网头
    transition select(hdr.ethernet.etherType) {
    TYPE_MPLS: parse_mpls; // 若为 MPLS 类型,则跳转到 MPLS 解析状态
    TYPE_IPV4: parse_ipv4; // 若为 IPv4 类型,则跳转到 IPv4 解析状态
    default: accept; // 否则直接接受
    }
    }

    state parse_mpls {
    packet.extract(hdr.mpls.next); // 提取下一个 MPLS 标签
    transition select(hdr.mpls.last.s) {
    1: parse_ipv4; // 如果是最后一个 MPLS 标签,跳转 IPv4
    default: parse_mpls; // 否则继续提取下一个 MPLS
    }
    }

    state parse_ipv4 {
    packet.extract(hdr.ipv4); // 提取 IPv4 头
    transition accept; // 接受并结束解析
    }
    }
  4. Checksum Verification
    1
    2
    3
    4
    control MyVerifyChecksum(inout headers hdr, inout metadata meta) {
    apply {
    }
    }
  5. Ingress Processing
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    354
    355
    356
    357
    358
    359
    360
    361
    362
    363
    364
    365
    366
    367
    368
    369
    370
    371
    372
    373
    374
    375
    376
    377
    378
    379
    380
    381
    382
    control MyIngress(inout headers hdr,
    inout metadata meta,
    inout standard_metadata_t standard_metadata) {

    /* Direct meter*/ // 定义基于字节的直接计量器,meter_color宽度2位
    direct_meter<bit<2>>(MeterType.bytes) rsvp_meter;

    // 丢弃包动作,标记丢弃
    action drop() {
    mark_to_drop(standard_metadata);
    }

    // IPv4转发动作,设置以太网MAC和出口端口,同时TTL减1
    action ipv4_forward(macAddr_t dstAddr, egressSpec_t port) {

    hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 以太网源地址设为原目标地址
    hdr.ethernet.dstAddr = dstAddr; // 以太网目标地址更新为下一跳地址

    standard_metadata.egress_spec = port; // 设置包出口端口
    hdr.ipv4.ttl = hdr.ipv4.ttl - 1; // IP TTL减1
    }

    // MPLS入栈1跳标签动作
    action mpls_ingress_1_hop(label_t label_1) {

    rsvp_meter.read(meta.meter_color); // 读计量器状态,更新meter_color

    hdr.ethernet.etherType = TYPE_MPLS; // 修改以太网类型为MPLS

    hdr.mpls.push_front(1); // 在标签栈前端插入1个标签
    hdr.mpls[0].setValid(); // 设置标签有效
    hdr.mpls[0].label = label_1; // 设置标签值
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1; // TTL设置为IPv4 TTL-1
    hdr.mpls[0].s = 1; // 标记为栈底标签
    }

    // MPLS入栈2跳标签动作
    action mpls_ingress_2_hop(label_t label_1, label_t label_2) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    hdr.mpls.push_front(1); // 先插入第1个标签
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1; // 栈底标签

    hdr.mpls.push_front(1); // 再插入第2个标签,成为新的栈顶
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0; // 非栈底标签
    }

    // MPLS入栈3跳标签动作
    action mpls_ingress_3_hop(label_t label_1, label_t label_2, label_t label_3) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    // 依次插入3个标签,注意s字段只有第1个为1,其他为0
    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_3;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;
    }

    // MPLS入栈4跳标签动作(以下类似,依次插入标签)
    action mpls_ingress_4_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_3;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_4;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;
    }

    // MPLS入栈5跳标签动作
    action mpls_ingress_5_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    // 依次插入5个标签,s字段仅第1个标签为1
    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_3;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_4;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_5;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;
    }

    // MPLS入栈6跳标签动作
    action mpls_ingress_6_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5, label_t label_6) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    // 依次插入6个标签
    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_3;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_4;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_5;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_6;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;
    }

    // MPLS入栈7跳标签动作
    action mpls_ingress_7_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5, label_t label_6, label_t label_7) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    // 依次插入7个标签
    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_3;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_4;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_5;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_6;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_7;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;
    }

    // MPLS入栈8跳标签动作
    action mpls_ingress_8_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5, label_t label_6, label_t label_7, label_t label_8) {
    rsvp_meter.read(meta.meter_color);

    hdr.ethernet.etherType = TYPE_MPLS;

    // 依次插入8个标签,s字段同前
    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_1;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 1;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_2;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_3;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_4;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_5;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_6;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_7;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;

    hdr.mpls.push_front(1);
    hdr.mpls[0].setValid();
    hdr.mpls[0].label = label_8;
    hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
    hdr.mpls[0].s = 0;
    }

    // FEC(Forwarding Equivalence Class)查表,匹配IP源和目的地址
    table FEC_tbl {
    key = {
    hdr.ipv4.srcAddr: lpm; // 源地址最长前缀匹配
    hdr.ipv4.dstAddr: exact; // 目的地址精确匹配
    }
    actions = { // 可执行动作集
    ipv4_forward; // IPv4转发
    mpls_ingress_1_hop; // MPLS入栈1跳
    mpls_ingress_2_hop; // MPLS入栈2跳
    mpls_ingress_3_hop; // MPLS入栈3跳
    mpls_ingress_4_hop; // MPLS入栈4跳
    mpls_ingress_5_hop; // MPLS入栈5跳
    mpls_ingress_6_hop; // MPLS入栈6跳
    mpls_ingress_7_hop; // MPLS入栈7跳
    mpls_ingress_8_hop; // MPLS入栈8跳
    NoAction; // 无动作
    }
    default_action = NoAction(); // 默认无动作
    meters = rsvp_meter; // 关联计量器
    size = 256; // 表大小256条
    }

    // MPLS转发动作
    action mpls_forward(macAddr_t dstAddr, egressSpec_t port) {

    hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 以太网源地址设为原目标地址
    hdr.ethernet.dstAddr = dstAddr; // 更新以太网目标地址

    standard_metadata.egress_spec = port; // 设置出口端口

    hdr.mpls[1].ttl = hdr.mpls[0].ttl - 1; // 下一个MPLS标签TTL减1

    hdr.mpls.pop_front(1); // 弹出当前栈顶标签
    }

    // Penultimate Hop Popping(倒数第二跳弹栈)动作
    action penultimate(macAddr_t dstAddr, egressSpec_t port) {

    hdr.ethernet.etherType = TYPE_IPV4; // 将以太网类型设置回IPv4

    hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 更新以太网源地址
    hdr.ethernet.dstAddr = dstAddr; // 更新以太网目标地址

    hdr.ipv4.ttl = hdr.mpls[0].ttl - 1; // 设置IP TTL

    standard_metadata.egress_spec = port; // 设置出口端口
    hdr.mpls.pop_front(1); // 弹出MPLS栈顶标签
    }

    // MPLS查表,匹配栈顶标签和栈底标记s
    table mpls_tbl {
    key = {
    hdr.mpls[0].label: exact; // 精确匹配标签
    hdr.mpls[0].s: exact; // 精确匹配栈底位
    }
    actions = {
    mpls_forward; // 普通MPLS转发动作
    penultimate; // 倒数第二跳弹栈动作
    NoAction; // 无动作
    }
    default_action = NoAction(); // 默认无动作
    size = CONST_MAX_LABELS; // 表大小128条
    }

    apply {
    /* Ingress Pipeline Control Logic */
    if(hdr.ipv4.isValid()){ // 如果是IPv4包
    FEC_tbl.apply(); // 运行FEC查表
    }
    if(hdr.mpls[0].isValid()){ // 如果存在MPLS标签
    mpls_tbl.apply(); // 运行MPLS查表
    }

    /* 如果计量器颜色不是绿色(0),则丢弃该包 */
    if (meta.meter_color != 0)
    {
    drop();
    }

    }
    }
  6. Egress Processing
    1
    2
    3
    4
    5
    6
    control MyEgress(inout headers hdr,
    inout metadata meta,
    inout standard_metadata_t standard_metadata) {
    apply {
    }
    }
  7. Checksum Computation
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    control MyComputeChecksum(inout headers  hdr, inout metadata meta) {
    apply {

    // 修改了IP TTL,需要重新计算IPv4头部校验和
    update_checksum(
    hdr.ipv4.isValid(), // 仅对有效IPv4头计算
    { hdr.ipv4.version,
    hdr.ipv4.ihl,
    hdr.ipv4.diffserv,
    hdr.ipv4.totalLen,
    hdr.ipv4.identification,
    hdr.ipv4.flags,
    hdr.ipv4.fragOffset,
    hdr.ipv4.ttl,
    hdr.ipv4.protocol,
    hdr.ipv4.srcAddr,
    hdr.ipv4.dstAddr },
    hdr.ipv4.hdrChecksum, // 更新校验和值字段
    HashAlgorithm.csum16); // 使用16位校验和算法
    }
    }
  8. DE parser
    1
    2
    3
    4
    5
    6
    7
    control MyDeparser(packet_out packet, in headers hdr) {
    apply {
    packet.emit(hdr.ethernet); // 输出以太网头部
    packet.emit(hdr.mpls); // 输出MPLS标签栈(全部有效标签)
    packet.emit(hdr.ipv4); // 输出IPv4头部
    }
    }
  9. Switch
    1
    2
    3
    4
    5
    6
    7
    8
    V1Switch(
    MyParser(),
    MyVerifyChecksum(),
    MyIngress(),
    MyEgress(),
    MyComputeChecksum(),
    MyDeparser()
    ) main;
  10. controller.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    # 导入标准库和所需模块
    import os # 提供文件和路径操作
    import threading # 用于创建并发线程
    import time # 提供时间控制函数
    from p4utils.utils.helper import load_topo # 导入拓扑加载函数
    from p4utils.utils.sswitch_p4runtime_API import SimpleSwitchP4RuntimeAPI # P4Runtime API 控制器
    from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI # Thrift API 控制器
    from cli import RSVPCLI # 命令行接口模块(用户交互)

    # 定义 RSVP 控制器类
    class RSVPController(object):

    def __init__(self):
    """初始化拓扑与数据结构"""
    # 检查拓扑文件是否存在
    if not os.path.exists('topology.json'):
    print('找不到 topology.json 文件!')
    raise Exception

    self.topo = load_topo('topology.json') # 加载拓扑
    self.controllers = {} # 存储每个交换机的控制器对象
    self.init() # 初始化连接与状态重置

    self.current_reservations = {} # 当前所有活跃的带宽预留请求
    self.links_capacity = self.build_links_capacity() # 初始化链路容量

    self.update_lock = threading.Lock() # 多线程访问保护锁
    # 创建定时检查预留超时的线程
    self.timeout_thread = threading.Thread(target=self.reservations_timeout_thread, args=(1,))
    self.timeout_thread.daemon = True # 设置为后台线程
    self.timeout_thread.start() # 启动线程

    def init(self):
    """连接所有交换机并重置其状态"""
    self.connect_to_switches() # 建立与所有交换机的 gRPC 连接
    self.reset_states() # 重置交换机状态

    def reset_states(self):
    """重置所有交换机的表项和状态"""
    for p4rtswitch, controller in self.controllers.items():
    controller.reset_state() # 重置 P4Runtime 控制器状态
    thrift_port = self.topo.get_thrift_port(p4rtswitch) # 获取 Thrift 端口
    controller_thrift = SimpleSwitchThriftAPI(thrift_port) # 建立 Thrift 连接
    controller_thrift.reset_state() # 重置 Thrift 交换机状态(如转发表)

    def connect_to_switches(self):
    """连接所有 P4 交换机并初始化控制器"""
    for p4rtswitch, data in self.topo.get_p4switches().items():
    device_id = self.topo.get_p4switch_id(p4rtswitch)
    grpc_port = self.topo.get_grpc_port(p4rtswitch)
    p4rt_path = data['p4rt_path']
    json_path = data['json_path']
    # 创建 SimpleSwitchP4RuntimeAPI 控制器对象并存储
    self.controllers[p4rtswitch] = SimpleSwitchP4RuntimeAPI(
    device_id, grpc_port, p4rt_path=p4rt_path, json_path=json_path)

    def build_links_capacity(self):
    """构建链路容量字典"""
    links_capacity = {}
    for src, dst in self.topo.keep_only_p4switches().edges:
    bw = self.topo.edges[(src, dst)]['bw'] # 获取链路带宽
    links_capacity[(src, dst)] = bw
    links_capacity[(dst, src)] = bw # 双向链路
    return links_capacity

    def reservations_timeout_thread(self, refresh_rate=1):
    """每隔 refresh_rate 秒检查一次是否有超时的预留请求"""
    while True:
    time.sleep(refresh_rate)
    with self.update_lock: # 加锁避免数据冲突
    to_remove = []
    for reservation, data in self.current_reservations.items():
    data['timeout'] -= refresh_rate
    if data['timeout'] <= 0: # 超时处理
    to_remove.append(reservation)
    for reservation in to_remove: # 批量删除过期预留
    self.del_reservation(*reservation)

    def set_mpls_tbl_labels(self):
    """设置默认 MPLS 标签转发表项"""
    for sw_name, controller in self.controllers.items():
    for host in self.topo.get_hosts_connected_to(sw_name):
    sw_port = self.topo.node_to_node_port_num(sw_name, host)
    host_ip = self.topo.get_host_ip(host)
    host_mac = self.topo.get_host_mac(host)
    controller.table_add('FEC_tbl', 'ipv4_forward',
    ['0.0.0.0/0', str(host_ip)],
    [str(host_mac), str(sw_port)])
    for switch in self.topo.get_switches_connected_to(sw_name):
    sw_port = self.topo.node_to_node_port_num(sw_name, switch)
    other_switch_mac = self.topo.node_to_node_mac(switch, sw_name)
    controller.table_add('mpls_tbl', 'mpls_forward',
    [str(sw_port), '0'], [str(other_switch_mac), str(sw_port)])
    controller.table_add('mpls_tbl', 'penultimate',
    [str(sw_port), '1'], [str(other_switch_mac), str(sw_port)])

    def build_mpls_path(self, switches_path):
    """生成 MPLS 标签路径(每跳为对应端口号)"""
    label_path = []
    for current_node, next_node in zip(switches_path, switches_path[1:]):
    label = self.topo.node_to_node_port_num(current_node, next_node)
    label_path.append(label)
    return label_path

    def get_sorted_paths(self, src, dst):
    """返回从 src 到 dst 的所有路径,按长度排序"""
    paths = self.topo.get_all_paths_between_nodes(src, dst)
    paths = [x[1:-1] for x in paths] # 去除起点和终点(主机)
    return paths

    def get_shortest_path(self, src, dst):
    """返回最短路径(不考虑链路带宽)"""
    return self.get_sorted_paths(src, dst)[0]

    def check_if_reservation_fits(self, path, bw):
    """检查路径上所有链路是否都有足够带宽"""
    for link in zip(path, path[1:]):
    if (self.links_capacity[link] - bw) < 0:
    return False
    return True

    def add_link_capacity(self, path, bw):
    """释放路径上所有链路的带宽(撤销预留)"""
    for link in zip(path, path[1:]):
    self.links_capacity[link] += bw

    def sub_link_capacity(self, path, bw):
    """占用路径上所有链路的带宽"""
    for link in zip(path, path[1:]):
    self.links_capacity[link] -= bw

    def get_available_path(self, src, dst, bw):
    """返回最短、可用带宽满足的路径"""
    paths = self.get_sorted_paths(src, dst)
    for path in paths:
    if self.check_if_reservation_fits(path, bw):
    return path
    return False

    def get_meter_rates_from_bw(self, bw, burst_size=700000):
    """将带宽转换为 CIR 和 PIR 速率配置(用于速率限制器)"""
    rates = []
    rates.append((int(0.125 * bw * 1e6), burst_size))
    rates.append((int(0.125 * bw * 1e6), burst_size))
    return rates

    def _add_reservation(self, src, dst, duration, bandwidth, priority, path, update):
    """实际执行添加或更新预留"""
    label_path = [str(x) for x in self.build_mpls_path(path)[::-1]] # 标签反向压栈
    rates = self.get_meter_rates_from_bw(bandwidth)
    src_gw = path[0]
    action = 'mpls_ingress_{}_hop'.format(len(label_path))
    src_ip = str(self.topo.get_host_ip(src) + '/32')
    dst_ip = str(self.topo.get_host_ip(dst))
    match = [src_ip, dst_ip]

    if len(label_path) != 0:
    if not update:
    done = self.controllers[src_gw].table_add('FEC_tbl', action, match, label_path, rates=rates)
    entry_match = match
    else:
    entry = self.current_reservations.get((src, dst))
    done = self.controllers[src_gw].table_modify_match('FEC_tbl', action, entry['match'], label_path, rates=rates)
    entry_match = entry['match']

    if done:
    self.sub_link_capacity(path, bandwidth)
    self.current_reservations[(src, dst)] = {'timeout': duration, 'bw': bandwidth, 'priority': priority, 'match': entry_match, 'path': path}
    print(f'Successful reservation({src}->{dst}): path: {"->".join(path)}')
    else:
    print(f'\033[91mFailed reservation({src}->{dst}): path: {"->".join(path)}\033[0m')
    else:
    print('Warning: Hosts are connected to the same switch!')

    def add_reservation(self, src, dst, duration, bandwidth, priority):
    """添加新的带宽预留请求,带优先级调度"""
    with self.update_lock:
    UPDATE_ENTRY = False
    if self.current_reservations.get((src, dst)):
    data = self.current_reservations[(src, dst)]
    self.add_link_capacity(data['path'], data['bw'])
    UPDATE_ENTRY = True

    path = self.get_available_path(src, dst, bandwidth)

    if path:
    self._add_reservation(src, dst, duration, bandwidth, priority, path, UPDATE_ENTRY)
    else:
    previous_links_capacities = self.links_capacity.copy()
    for reservation, data in self.current_reservations.items():
    if reservation == (src, dst):
    continue
    if data['priority'] < priority:
    self.add_link_capacity(data['path'], data['bw'])

    path = self.get_available_path(src, dst, bandwidth)

    if path:
    self._add_reservation(src, dst, duration, bandwidth, priority, path, UPDATE_ENTRY)

    for reservation, data in sorted(self.current_reservations.items(), key=lambda x: x[1]['priority'], reverse=True):
    if data['priority'] < priority:
    src_r, dst_r = reservation
    new_path = self.get_available_path(src_r, dst_r, data['bw'])
    if new_path:
    self._add_reservation(src_r, dst_r, data['timeout'], data['bw'], data['priority'], new_path, True)
    else:
    self.sub_link_capacity(data['path'], data['bw'])
    print(f'\033[91mDeleting allocation {src_r}->{dst_r} due to a higher priority allocation!\033[0m')
    self.del_reservation(src_r, dst_r)
    else:
    self.links_capacity = previous_links_capacities
    if UPDATE_ENTRY:
    data = self.current_reservations[(src, dst)]
    self.sub_link_capacity(data['path'], data['bw'])
    print('Deleting new allocation. Does not fit anymore!')
    self.del_reservation(src, dst)
    print('\033[91mRESERVATION FAILURE: no bandwidth available!\033[0m')

    def del_reservation(self, src, dst):
    """删除一个预留并恢复链路带宽"""
    entry = self.current_reservations.get((src, dst))
    if entry:
    sw_gw = self.topo.get_host_gateway_name(src)
    self.controllers[sw_gw].table_delete_match('FEC_tbl', entry['match'])
    self.add_link_capacity(entry['path'], entry['bw'])
    del self.current_reservations[(src, dst)]
    print(f'\nRSVP Deleted/Expired Reservation({src}->{dst}): path: {"->".join(entry["path"])}')
    else:
    print(f'No entry for {src} -> {dst}')

    def del_all_reservations(self):
    """删除所有预留请求"""
    with self.update_lock:
    for src, dst in list(self.current_reservations.keys()):
    self.del_reservation(src, dst)

    # 主程序入口
    if __name__ == '__main__':
    controller = RSVPController() # 实例化控制器
    controller.set_mpls_tbl_labels() # 初始化 MPLS 转发表项
    cli = RSVPCLI(controller) # 启动交互式命令行接口

  11. network.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    from p4utils.mininetlib.network_API import NetworkAPI

    net = NetworkAPI()

    # Network general options
    net.setLogLevel('info')
    net.setCompiler(p4rt=True)

    # Network definition
    net.addP4RuntimeSwitch('s1')
    net.addP4RuntimeSwitch('s2')
    net.addP4RuntimeSwitch('s3')
    net.addP4RuntimeSwitch('s4')
    net.addP4RuntimeSwitch('s5')
    net.addP4RuntimeSwitch('s6')
    net.addP4RuntimeSwitch('s7')
    net.setP4SourceAll('rsvp.p4')

    net.addHost('h1')
    net.addHost('h2')
    net.addHost('h3')
    net.addHost('h4')
    net.addHost('h5')
    net.addHost('h6')

    net.addLink("h1", "s1")
    net.addLink("h2", "s1")
    net.addLink("s1", "s2")
    net.addLink("s1", "s3")
    net.addLink("s2", "s4")
    net.addLink("s3", "s4")
    net.addLink("s2", "s5")
    net.addLink("s3", "s6")
    net.addLink("s4", "s5")
    net.addLink("s4", "s6")
    net.addLink("s5", "s7")
    net.addLink("s6", "s7")
    net.addLink("s7", "h5")
    net.addLink("s7", "h6")
    net.addLink("h3", "s3")
    net.addLink("h4", "s5")
    net.setBwAll(10)

    # Assignment strategy
    net.l3()

    # Nodes general options
    net.disablePcapDumpAll()
    net.enableLogAll()
    net.enableCli()
    net.startNetwork()
  • P4仿真(以rsvp功能为例)

  1. 启动网络拓扑sudo p4run
启动网络拓扑
  1. 启动控制器python rsvp_controller.py
启动控制器
  1. mininet CLI命令窗口输入h1 ping h6,发现无法通信
无法通信
  1. rsvpCLI命令窗口输入add_reservation h1 h6 20 5和`add_reservation h6 h1 20 5
配置rsvp
  1. 再次在mininet CLI命令窗口输入h1 ping h6,发现成功通信
成功通信

P4 07-RSVP
http://example.com/2025/08/04/P4 07-RSVP/
作者
Wsdbybyd
发布于
2025年8月4日
许可协议