P4 07-RSVP
P4示例程序-07 RSVP
功能
使用Meter表实现带宽限速。Meter本质就是一个流量计量器,它支持实时统计通过的流量,并根据配置的速率限制流量。底层实现一般是用令牌桶或者类似算法,令牌桶维护两个计数器:桶容量代表允许突发的最大流量、令牌生成速率用于控制流量的平均速率。在交换机中的逻辑实现可采用如下配置,当有数据包到达时:
- 桶里有足够令牌时:包被标记为绿色(正常,合规流量),消耗对应令牌数;
- 桶里令牌不足但尚未完全溢出时:包被标记为黄色(轻微超出限速,可以容忍);
- 令牌完全用完时:包被标记为红色(超出限速,丢弃或降级处理)。
拓扑结构
代码
p4app.json1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53"topology": {
"assignment_strategy": "l3",
"default": {
"bw": 10
},
"links": [["h1", "s1"],
["h2", "s1"],
["s1", "s2"],
["s1", "s3"],
["s2", "s4"],
["s3", "s4"],
["s2", "s5"],
["s3", "s6"],
["s4", "s5"],
["s4", "s6"],
["s5", "s7"],
["s6", "s7"],
["s7", "h5"],
["s7", "h6"],
["h3", "s3"],
["h4", "s5"]
],
"hosts": {
"h1": {
},
"h2": {
},
"h3": {
},
"h4": {
},
"h5": {
},
"h6": {
}
},
"switches": {
"s1": {
},
"s2": {
},
"s3": {
},
"s4": {
},
"s5": {
},
"s6": {
},
"s7": {
}
}
}- Headers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42typedef bit<9> egressSpec_t; // 9位出口端口号
typedef bit<48> macAddr_t; // 48位 MAC 地址类型
typedef bit<32> ip4Addr_t; // 32位 IPv4 地址类型
typedef bit<20> label_t; // 20位 MPLS 标签类型
header ethernet_t {
macAddr_t dstAddr; // 目的 MAC 地址
macAddr_t srcAddr; // 源 MAC 地址
bit<16> etherType; // 以太网类型字段(IPv4/MPLS)
}
header mpls_t {
bit<20> label; // MPLS 标签值
bit<3> exp; // 实验字段(一般用于 QoS)
bit<1> s; // Bottom of Stack 标志位
bit<8> ttl; // Time To Live
}
header ipv4_t {
bit<4> version; // IP 版本
bit<4> ihl; // 头部长度
bit<8> diffserv; // 服务类型字段
bit<16> totalLen; // 总长度
bit<16> identification; // 标识字段
bit<3> flags; // 标志位
bit<13> fragOffset; // 分段偏移量
bit<8> ttl; // 生存时间
bit<8> protocol; // 上层协议
bit<16> hdrChecksum; // IP 头校验和
ip4Addr_t srcAddr; // 源 IP 地址
ip4Addr_t dstAddr; // 目的 IP 地址
}
struct metadata {
bit<2> meter_color; // 速率限制颜色:0绿、1黄、2红(rsvp meter 读入)
}
struct headers {
ethernet_t ethernet; // 以太网头
mpls_t[CONST_MAX_MPLS_HOPS] mpls; // MPLS 标签栈(最多8个)
ipv4_t ipv4; // IPv4 头
} - Parser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
parser MyParser(packet_in packet,
out headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
state start {
transition parse_ethernet; // 起始状态跳转到以太网解析
}
state parse_ethernet {
packet.extract(hdr.ethernet); // 提取以太网头
transition select(hdr.ethernet.etherType) {
TYPE_MPLS: parse_mpls; // 若为 MPLS 类型,则跳转到 MPLS 解析状态
TYPE_IPV4: parse_ipv4; // 若为 IPv4 类型,则跳转到 IPv4 解析状态
default: accept; // 否则直接接受
}
}
state parse_mpls {
packet.extract(hdr.mpls.next); // 提取下一个 MPLS 标签
transition select(hdr.mpls.last.s) {
1: parse_ipv4; // 如果是最后一个 MPLS 标签,跳转 IPv4
default: parse_mpls; // 否则继续提取下一个 MPLS
}
}
state parse_ipv4 {
packet.extract(hdr.ipv4); // 提取 IPv4 头
transition accept; // 接受并结束解析
}
} - Checksum Verification
1
2
3
4control MyVerifyChecksum(inout headers hdr, inout metadata meta) {
apply {
}
} - Ingress Processing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382control MyIngress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
/* Direct meter*/ // 定义基于字节的直接计量器,meter_color宽度2位
direct_meter<bit<2>>(MeterType.bytes) rsvp_meter;
// 丢弃包动作,标记丢弃
action drop() {
mark_to_drop(standard_metadata);
}
// IPv4转发动作,设置以太网MAC和出口端口,同时TTL减1
action ipv4_forward(macAddr_t dstAddr, egressSpec_t port) {
hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 以太网源地址设为原目标地址
hdr.ethernet.dstAddr = dstAddr; // 以太网目标地址更新为下一跳地址
standard_metadata.egress_spec = port; // 设置包出口端口
hdr.ipv4.ttl = hdr.ipv4.ttl - 1; // IP TTL减1
}
// MPLS入栈1跳标签动作
action mpls_ingress_1_hop(label_t label_1) {
rsvp_meter.read(meta.meter_color); // 读计量器状态,更新meter_color
hdr.ethernet.etherType = TYPE_MPLS; // 修改以太网类型为MPLS
hdr.mpls.push_front(1); // 在标签栈前端插入1个标签
hdr.mpls[0].setValid(); // 设置标签有效
hdr.mpls[0].label = label_1; // 设置标签值
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1; // TTL设置为IPv4 TTL-1
hdr.mpls[0].s = 1; // 标记为栈底标签
}
// MPLS入栈2跳标签动作
action mpls_ingress_2_hop(label_t label_1, label_t label_2) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
hdr.mpls.push_front(1); // 先插入第1个标签
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1; // 栈底标签
hdr.mpls.push_front(1); // 再插入第2个标签,成为新的栈顶
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0; // 非栈底标签
}
// MPLS入栈3跳标签动作
action mpls_ingress_3_hop(label_t label_1, label_t label_2, label_t label_3) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
// 依次插入3个标签,注意s字段只有第1个为1,其他为0
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_3;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
}
// MPLS入栈4跳标签动作(以下类似,依次插入标签)
action mpls_ingress_4_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_3;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_4;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
}
// MPLS入栈5跳标签动作
action mpls_ingress_5_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
// 依次插入5个标签,s字段仅第1个标签为1
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_3;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_4;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_5;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
}
// MPLS入栈6跳标签动作
action mpls_ingress_6_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5, label_t label_6) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
// 依次插入6个标签
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_3;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_4;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_5;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_6;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
}
// MPLS入栈7跳标签动作
action mpls_ingress_7_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5, label_t label_6, label_t label_7) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
// 依次插入7个标签
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_3;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_4;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_5;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_6;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_7;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
}
// MPLS入栈8跳标签动作
action mpls_ingress_8_hop(label_t label_1, label_t label_2, label_t label_3, label_t label_4, label_t label_5, label_t label_6, label_t label_7, label_t label_8) {
rsvp_meter.read(meta.meter_color);
hdr.ethernet.etherType = TYPE_MPLS;
// 依次插入8个标签,s字段同前
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_1;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 1;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_2;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_3;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_4;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_5;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_6;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_7;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
hdr.mpls.push_front(1);
hdr.mpls[0].setValid();
hdr.mpls[0].label = label_8;
hdr.mpls[0].ttl = hdr.ipv4.ttl - 1;
hdr.mpls[0].s = 0;
}
// FEC(Forwarding Equivalence Class)查表,匹配IP源和目的地址
table FEC_tbl {
key = {
hdr.ipv4.srcAddr: lpm; // 源地址最长前缀匹配
hdr.ipv4.dstAddr: exact; // 目的地址精确匹配
}
actions = { // 可执行动作集
ipv4_forward; // IPv4转发
mpls_ingress_1_hop; // MPLS入栈1跳
mpls_ingress_2_hop; // MPLS入栈2跳
mpls_ingress_3_hop; // MPLS入栈3跳
mpls_ingress_4_hop; // MPLS入栈4跳
mpls_ingress_5_hop; // MPLS入栈5跳
mpls_ingress_6_hop; // MPLS入栈6跳
mpls_ingress_7_hop; // MPLS入栈7跳
mpls_ingress_8_hop; // MPLS入栈8跳
NoAction; // 无动作
}
default_action = NoAction(); // 默认无动作
meters = rsvp_meter; // 关联计量器
size = 256; // 表大小256条
}
// MPLS转发动作
action mpls_forward(macAddr_t dstAddr, egressSpec_t port) {
hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 以太网源地址设为原目标地址
hdr.ethernet.dstAddr = dstAddr; // 更新以太网目标地址
standard_metadata.egress_spec = port; // 设置出口端口
hdr.mpls[1].ttl = hdr.mpls[0].ttl - 1; // 下一个MPLS标签TTL减1
hdr.mpls.pop_front(1); // 弹出当前栈顶标签
}
// Penultimate Hop Popping(倒数第二跳弹栈)动作
action penultimate(macAddr_t dstAddr, egressSpec_t port) {
hdr.ethernet.etherType = TYPE_IPV4; // 将以太网类型设置回IPv4
hdr.ethernet.srcAddr = hdr.ethernet.dstAddr; // 更新以太网源地址
hdr.ethernet.dstAddr = dstAddr; // 更新以太网目标地址
hdr.ipv4.ttl = hdr.mpls[0].ttl - 1; // 设置IP TTL
standard_metadata.egress_spec = port; // 设置出口端口
hdr.mpls.pop_front(1); // 弹出MPLS栈顶标签
}
// MPLS查表,匹配栈顶标签和栈底标记s
table mpls_tbl {
key = {
hdr.mpls[0].label: exact; // 精确匹配标签
hdr.mpls[0].s: exact; // 精确匹配栈底位
}
actions = {
mpls_forward; // 普通MPLS转发动作
penultimate; // 倒数第二跳弹栈动作
NoAction; // 无动作
}
default_action = NoAction(); // 默认无动作
size = CONST_MAX_LABELS; // 表大小128条
}
apply {
/* Ingress Pipeline Control Logic */
if(hdr.ipv4.isValid()){ // 如果是IPv4包
FEC_tbl.apply(); // 运行FEC查表
}
if(hdr.mpls[0].isValid()){ // 如果存在MPLS标签
mpls_tbl.apply(); // 运行MPLS查表
}
/* 如果计量器颜色不是绿色(0),则丢弃该包 */
if (meta.meter_color != 0)
{
drop();
}
}
} - Egress Processing
1
2
3
4
5
6control MyEgress(inout headers hdr,
inout metadata meta,
inout standard_metadata_t standard_metadata) {
apply {
}
} - Checksum Computation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21control MyComputeChecksum(inout headers hdr, inout metadata meta) {
apply {
// 修改了IP TTL,需要重新计算IPv4头部校验和
update_checksum(
hdr.ipv4.isValid(), // 仅对有效IPv4头计算
{ hdr.ipv4.version,
hdr.ipv4.ihl,
hdr.ipv4.diffserv,
hdr.ipv4.totalLen,
hdr.ipv4.identification,
hdr.ipv4.flags,
hdr.ipv4.fragOffset,
hdr.ipv4.ttl,
hdr.ipv4.protocol,
hdr.ipv4.srcAddr,
hdr.ipv4.dstAddr },
hdr.ipv4.hdrChecksum, // 更新校验和值字段
HashAlgorithm.csum16); // 使用16位校验和算法
}
} - DE parser
1
2
3
4
5
6
7control MyDeparser(packet_out packet, in headers hdr) {
apply {
packet.emit(hdr.ethernet); // 输出以太网头部
packet.emit(hdr.mpls); // 输出MPLS标签栈(全部有效标签)
packet.emit(hdr.ipv4); // 输出IPv4头部
}
} - Switch
1
2
3
4
5
6
7
8V1Switch(
MyParser(),
MyVerifyChecksum(),
MyIngress(),
MyEgress(),
MyComputeChecksum(),
MyDeparser()
) main; controller.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243# 导入标准库和所需模块
import os # 提供文件和路径操作
import threading # 用于创建并发线程
import time # 提供时间控制函数
from p4utils.utils.helper import load_topo # 导入拓扑加载函数
from p4utils.utils.sswitch_p4runtime_API import SimpleSwitchP4RuntimeAPI # P4Runtime API 控制器
from p4utils.utils.sswitch_thrift_API import SimpleSwitchThriftAPI # Thrift API 控制器
from cli import RSVPCLI # 命令行接口模块(用户交互)
# 定义 RSVP 控制器类
class RSVPController(object):
def __init__(self):
"""初始化拓扑与数据结构"""
# 检查拓扑文件是否存在
if not os.path.exists('topology.json'):
print('找不到 topology.json 文件!')
raise Exception
self.topo = load_topo('topology.json') # 加载拓扑
self.controllers = {} # 存储每个交换机的控制器对象
self.init() # 初始化连接与状态重置
self.current_reservations = {} # 当前所有活跃的带宽预留请求
self.links_capacity = self.build_links_capacity() # 初始化链路容量
self.update_lock = threading.Lock() # 多线程访问保护锁
# 创建定时检查预留超时的线程
self.timeout_thread = threading.Thread(target=self.reservations_timeout_thread, args=(1,))
self.timeout_thread.daemon = True # 设置为后台线程
self.timeout_thread.start() # 启动线程
def init(self):
"""连接所有交换机并重置其状态"""
self.connect_to_switches() # 建立与所有交换机的 gRPC 连接
self.reset_states() # 重置交换机状态
def reset_states(self):
"""重置所有交换机的表项和状态"""
for p4rtswitch, controller in self.controllers.items():
controller.reset_state() # 重置 P4Runtime 控制器状态
thrift_port = self.topo.get_thrift_port(p4rtswitch) # 获取 Thrift 端口
controller_thrift = SimpleSwitchThriftAPI(thrift_port) # 建立 Thrift 连接
controller_thrift.reset_state() # 重置 Thrift 交换机状态(如转发表)
def connect_to_switches(self):
"""连接所有 P4 交换机并初始化控制器"""
for p4rtswitch, data in self.topo.get_p4switches().items():
device_id = self.topo.get_p4switch_id(p4rtswitch)
grpc_port = self.topo.get_grpc_port(p4rtswitch)
p4rt_path = data['p4rt_path']
json_path = data['json_path']
# 创建 SimpleSwitchP4RuntimeAPI 控制器对象并存储
self.controllers[p4rtswitch] = SimpleSwitchP4RuntimeAPI(
device_id, grpc_port, p4rt_path=p4rt_path, json_path=json_path)
def build_links_capacity(self):
"""构建链路容量字典"""
links_capacity = {}
for src, dst in self.topo.keep_only_p4switches().edges:
bw = self.topo.edges[(src, dst)]['bw'] # 获取链路带宽
links_capacity[(src, dst)] = bw
links_capacity[(dst, src)] = bw # 双向链路
return links_capacity
def reservations_timeout_thread(self, refresh_rate=1):
"""每隔 refresh_rate 秒检查一次是否有超时的预留请求"""
while True:
time.sleep(refresh_rate)
with self.update_lock: # 加锁避免数据冲突
to_remove = []
for reservation, data in self.current_reservations.items():
data['timeout'] -= refresh_rate
if data['timeout'] <= 0: # 超时处理
to_remove.append(reservation)
for reservation in to_remove: # 批量删除过期预留
self.del_reservation(*reservation)
def set_mpls_tbl_labels(self):
"""设置默认 MPLS 标签转发表项"""
for sw_name, controller in self.controllers.items():
for host in self.topo.get_hosts_connected_to(sw_name):
sw_port = self.topo.node_to_node_port_num(sw_name, host)
host_ip = self.topo.get_host_ip(host)
host_mac = self.topo.get_host_mac(host)
controller.table_add('FEC_tbl', 'ipv4_forward',
['0.0.0.0/0', str(host_ip)],
[str(host_mac), str(sw_port)])
for switch in self.topo.get_switches_connected_to(sw_name):
sw_port = self.topo.node_to_node_port_num(sw_name, switch)
other_switch_mac = self.topo.node_to_node_mac(switch, sw_name)
controller.table_add('mpls_tbl', 'mpls_forward',
[str(sw_port), '0'], [str(other_switch_mac), str(sw_port)])
controller.table_add('mpls_tbl', 'penultimate',
[str(sw_port), '1'], [str(other_switch_mac), str(sw_port)])
def build_mpls_path(self, switches_path):
"""生成 MPLS 标签路径(每跳为对应端口号)"""
label_path = []
for current_node, next_node in zip(switches_path, switches_path[1:]):
label = self.topo.node_to_node_port_num(current_node, next_node)
label_path.append(label)
return label_path
def get_sorted_paths(self, src, dst):
"""返回从 src 到 dst 的所有路径,按长度排序"""
paths = self.topo.get_all_paths_between_nodes(src, dst)
paths = [x[1:-1] for x in paths] # 去除起点和终点(主机)
return paths
def get_shortest_path(self, src, dst):
"""返回最短路径(不考虑链路带宽)"""
return self.get_sorted_paths(src, dst)[0]
def check_if_reservation_fits(self, path, bw):
"""检查路径上所有链路是否都有足够带宽"""
for link in zip(path, path[1:]):
if (self.links_capacity[link] - bw) < 0:
return False
return True
def add_link_capacity(self, path, bw):
"""释放路径上所有链路的带宽(撤销预留)"""
for link in zip(path, path[1:]):
self.links_capacity[link] += bw
def sub_link_capacity(self, path, bw):
"""占用路径上所有链路的带宽"""
for link in zip(path, path[1:]):
self.links_capacity[link] -= bw
def get_available_path(self, src, dst, bw):
"""返回最短、可用带宽满足的路径"""
paths = self.get_sorted_paths(src, dst)
for path in paths:
if self.check_if_reservation_fits(path, bw):
return path
return False
def get_meter_rates_from_bw(self, bw, burst_size=700000):
"""将带宽转换为 CIR 和 PIR 速率配置(用于速率限制器)"""
rates = []
rates.append((int(0.125 * bw * 1e6), burst_size))
rates.append((int(0.125 * bw * 1e6), burst_size))
return rates
def _add_reservation(self, src, dst, duration, bandwidth, priority, path, update):
"""实际执行添加或更新预留"""
label_path = [str(x) for x in self.build_mpls_path(path)[::-1]] # 标签反向压栈
rates = self.get_meter_rates_from_bw(bandwidth)
src_gw = path[0]
action = 'mpls_ingress_{}_hop'.format(len(label_path))
src_ip = str(self.topo.get_host_ip(src) + '/32')
dst_ip = str(self.topo.get_host_ip(dst))
match = [src_ip, dst_ip]
if len(label_path) != 0:
if not update:
done = self.controllers[src_gw].table_add('FEC_tbl', action, match, label_path, rates=rates)
entry_match = match
else:
entry = self.current_reservations.get((src, dst))
done = self.controllers[src_gw].table_modify_match('FEC_tbl', action, entry['match'], label_path, rates=rates)
entry_match = entry['match']
if done:
self.sub_link_capacity(path, bandwidth)
self.current_reservations[(src, dst)] = {'timeout': duration, 'bw': bandwidth, 'priority': priority, 'match': entry_match, 'path': path}
print(f'Successful reservation({src}->{dst}): path: {"->".join(path)}')
else:
print(f'\033[91mFailed reservation({src}->{dst}): path: {"->".join(path)}\033[0m')
else:
print('Warning: Hosts are connected to the same switch!')
def add_reservation(self, src, dst, duration, bandwidth, priority):
"""添加新的带宽预留请求,带优先级调度"""
with self.update_lock:
UPDATE_ENTRY = False
if self.current_reservations.get((src, dst)):
data = self.current_reservations[(src, dst)]
self.add_link_capacity(data['path'], data['bw'])
UPDATE_ENTRY = True
path = self.get_available_path(src, dst, bandwidth)
if path:
self._add_reservation(src, dst, duration, bandwidth, priority, path, UPDATE_ENTRY)
else:
previous_links_capacities = self.links_capacity.copy()
for reservation, data in self.current_reservations.items():
if reservation == (src, dst):
continue
if data['priority'] < priority:
self.add_link_capacity(data['path'], data['bw'])
path = self.get_available_path(src, dst, bandwidth)
if path:
self._add_reservation(src, dst, duration, bandwidth, priority, path, UPDATE_ENTRY)
for reservation, data in sorted(self.current_reservations.items(), key=lambda x: x[1]['priority'], reverse=True):
if data['priority'] < priority:
src_r, dst_r = reservation
new_path = self.get_available_path(src_r, dst_r, data['bw'])
if new_path:
self._add_reservation(src_r, dst_r, data['timeout'], data['bw'], data['priority'], new_path, True)
else:
self.sub_link_capacity(data['path'], data['bw'])
print(f'\033[91mDeleting allocation {src_r}->{dst_r} due to a higher priority allocation!\033[0m')
self.del_reservation(src_r, dst_r)
else:
self.links_capacity = previous_links_capacities
if UPDATE_ENTRY:
data = self.current_reservations[(src, dst)]
self.sub_link_capacity(data['path'], data['bw'])
print('Deleting new allocation. Does not fit anymore!')
self.del_reservation(src, dst)
print('\033[91mRESERVATION FAILURE: no bandwidth available!\033[0m')
def del_reservation(self, src, dst):
"""删除一个预留并恢复链路带宽"""
entry = self.current_reservations.get((src, dst))
if entry:
sw_gw = self.topo.get_host_gateway_name(src)
self.controllers[sw_gw].table_delete_match('FEC_tbl', entry['match'])
self.add_link_capacity(entry['path'], entry['bw'])
del self.current_reservations[(src, dst)]
print(f'\nRSVP Deleted/Expired Reservation({src}->{dst}): path: {"->".join(entry["path"])}')
else:
print(f'No entry for {src} -> {dst}')
def del_all_reservations(self):
"""删除所有预留请求"""
with self.update_lock:
for src, dst in list(self.current_reservations.keys()):
self.del_reservation(src, dst)
# 主程序入口
if __name__ == '__main__':
controller = RSVPController() # 实例化控制器
controller.set_mpls_tbl_labels() # 初始化 MPLS 转发表项
cli = RSVPCLI(controller) # 启动交互式命令行接口network.py1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51from p4utils.mininetlib.network_API import NetworkAPI
net = NetworkAPI()
# Network general options
net.setLogLevel('info')
net.setCompiler(p4rt=True)
# Network definition
net.addP4RuntimeSwitch('s1')
net.addP4RuntimeSwitch('s2')
net.addP4RuntimeSwitch('s3')
net.addP4RuntimeSwitch('s4')
net.addP4RuntimeSwitch('s5')
net.addP4RuntimeSwitch('s6')
net.addP4RuntimeSwitch('s7')
net.setP4SourceAll('rsvp.p4')
net.addHost('h1')
net.addHost('h2')
net.addHost('h3')
net.addHost('h4')
net.addHost('h5')
net.addHost('h6')
net.addLink("h1", "s1")
net.addLink("h2", "s1")
net.addLink("s1", "s2")
net.addLink("s1", "s3")
net.addLink("s2", "s4")
net.addLink("s3", "s4")
net.addLink("s2", "s5")
net.addLink("s3", "s6")
net.addLink("s4", "s5")
net.addLink("s4", "s6")
net.addLink("s5", "s7")
net.addLink("s6", "s7")
net.addLink("s7", "h5")
net.addLink("s7", "h6")
net.addLink("h3", "s3")
net.addLink("h4", "s5")
net.setBwAll(10)
# Assignment strategy
net.l3()
# Nodes general options
net.disablePcapDumpAll()
net.enableLogAll()
net.enableCli()
net.startNetwork()
P4仿真(以rsvp功能为例)
- 启动网络拓扑
sudo p4run
- 启动控制器
python rsvp_controller.py
- 在
mininetCLI命令窗口输入h1 ping h6,发现无法通信
- 在
rsvpCLI命令窗口输入add_reservation h1 h6 20 5和`add_reservation h6 h1 20 5
- 再次在
mininetCLI命令窗口输入h1 ping h6,发现成功通信
P4 07-RSVP
http://example.com/2025/08/04/P4 07-RSVP/