OpenFlow1.0.0 带无线扩展的OF1.0协议代码详解———main

openflow1.0.0 是斯坦福团队在2008年为满足AP在SDN系统中适配提出的openflow协议无线域扩展。此协议以openflow1.0为基础,解析处理 ieee80211 数据;利用 click 元素实现AP到控制器的转发功能。

这一篇文章从主函数出发,解读了代码的几个框架性函数。

主程序运行在 udatapath.c 中。

从 main 函数开始解读>
1
2
3
4
5
6
7
8
9
10
11
12
int
main(int argc, char *argv[])
{
int n_listeners;
int error;
int i;
set_program_name(argv[0]); //设置程序名
register_fault_handlers(); //注册新号故障处理器
time_init();
vlog_init();
parse_options(argc, argv); //解析选项
signal(SIGPIPE, SIG_IGN);

主要看一下 parse_options 函数,此函数定义了该子程序的解析选项。

parse_options

这里不是完整的函数代码,我们主要关注参数选项和对应的执行函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
  static void
parse_options(int argc, char *argv[])
{
...
static struct option long_options[] = {
{"interfaces", required_argument, 0, 'i'},
{"local-port", required_argument, 0, 'L'},
{"no-local-port", no_argument, 0, OPT_NO_LOCAL_PORT},
{"datapath-id", required_argument, 0, 'd'},
{"verbose", optional_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{"version", no_argument, 0, 'V'},
{"no-slicing", no_argument, 0, OPT_NO_SLICING},
{"mfr-desc", required_argument, 0, OPT_MFR_DESC},
{"hw-desc", required_argument, 0, OPT_HW_DESC},
{"sw-desc", required_argument, 0, OPT_SW_DESC},
{"dp_desc", required_argument, 0, OPT_DP_DESC},
{"serial_num", required_argument, 0, OPT_SERIAL_NUM},
DAEMON_LONG_OPTIONS,

for (;;) {
int indexptr;
int c;

c = getopt_long(argc, argv, short_options, long_options, &indexptr);
if (c == -1) {
break;
}

switch (c) {
case 'd':
if (strlen(optarg) != 12
|| strspn(optarg, "0123456789abcdefABCDEF") != 12) {
ofp_fatal(0, "argument to -d or --datapath-id must be "
"exactly 12 hex digits");
}
dpid = strtoll(optarg, NULL, 16);
if (!dpid) {
ofp_fatal(0, "argument to -d or --datapath-id must "
"be nonzero");
}
break;

case 'h':
usage();

case 'V':
printf("%s %s compiled "__DATE__" "__TIME__"\n",
program_name, VERSION BUILDNR);
exit(EXIT_SUCCESS);

case 'v':
vlog_set_verbosity(optarg);
break;

case 'i':
if (!port_list) {
port_list = optarg;
} else {
port_list = xasprintf("%s,%s", port_list, optarg);
}
break;

case 'L':
local_port = optarg;
break;

case OPT_NO_LOCAL_PORT:
local_port = NULL;
break;

case OPT_MFR_DESC:
strncpy(mfr_desc, optarg, sizeof mfr_desc);
break;

case OPT_HW_DESC:
strncpy(hw_desc, optarg, sizeof hw_desc);
break;

case OPT_SW_DESC:
strncpy(sw_desc, optarg, sizeof sw_desc);
break;

case OPT_DP_DESC:
strncpy(dp_desc, optarg, sizeof dp_desc);
break;

case OPT_SERIAL_NUM:
strncpy(serial_num, optarg, sizeof serial_num);
break;

case OPT_NO_SLICING:
num_queues = 0;
break;

DAEMON_OPTION_HANDLERS

}
}
free(short_options);
}

主函数对程序输入参数进行排错判断

1
2
3
4
if (argc - optind < 1) {
ofp_fatal(0, "at least one listener argument is required; "
"use --help for usage");
}

建立新的datapath

1
error = dp_new(&dp, dpid);

查看 dp_new 函数

dp_new

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int
dp_new(struct datapath **dp_, uint64_t dpid)
{
struct datapath *dp;

dp = calloc(1, sizeof *dp);
if (!dp) {
return ENOMEM;
}

dp->last_timeout = time_now();
list_init(&dp->remotes);
dp->listeners = NULL;
dp->n_listeners = 0;
dp->id = dpid <= UINT64_C(0xffffffffffff) ? dpid : gen_datapath_id();
dp->chain = chain_create(dp); //Creates and returns a new chain.
if (!dp->chain) {
VLOG_ERR("could not create chain");
free(dp);
return ENOMEM;
}

list_init(&dp->port_list);
dp->flags = 0;
dp->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;

if(strlen(&dp_desc) > 0) /* use the comment, if specified */
strncpy(dp->dp_desc, &dp_desc, sizeof dp->dp_desc);
else /* else, just use "$HOSTNAME pid=$$" */
{
char hostnametmp[DESC_STR_LEN];
gethostname(hostnametmp,sizeof hostnametmp);
snprintf(dp->dp_desc, sizeof dp->dp_desc,"%s pid=%u",hostnametmp, getpid());
}

*dp_ = dp;
return 0;
}

此函数对 datapath 结构体的各成员做出初始化定义,其中比较重要的是 chain ,一起来看一下 chain_create 结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Creates and returns a new chain.  Returns NULL if the chain cannot be
* created. */
struct sw_chain *chain_create(struct datapath *dp)
{
struct sw_chain *chain = calloc(1, sizeof *chain); //分配内存空间
if (chain == NULL)
return NULL;

chain->dp = dp;
if (add_table(chain, table_hash2_create(0x1EDC6F41, TABLE_HASH_MAX_FLOWS,
0x741B8CD7, TABLE_HASH_MAX_FLOWS),
0)
|| add_table(chain, table_linear_create(TABLE_LINEAR_MAX_FLOWS), 0)
|| add_table(chain, table_linear_create(TABLE_LINEAR_MAX_FLOWS), 1)) {
chain_destroy(chain);
return NULL;
}

return chain;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* Attempts to append 'table' to the set of tables in 'chain'.  Returns 0 or
* negative error. If 'table' is null it is assumed that table creation failed
* due to out-of-memory. */
static int add_table(struct sw_chain *chain, struct sw_table *table, int emerg)
{
if (table == NULL)
return -ENOMEM;
if (chain->n_tables >= CHAIN_MAX_TABLES) {
VLOG_ERR("too many tables in chain\n");
table->destroy(table);
return -ENOBUFS;
}
if (emerg)
chain->emerg_table = table;
else
chain->tables[chain->n_tables++] = table;
return 0;
}

回到 main 函数,这一段代码主要实现虚拟通道的建立和连接,不做详解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
n_listeners = 0;
for (i = optind; i < argc; i++) {
const char *pvconn_name = argv[i];
struct pvconn *pvconn;
int retval;

retval = pvconn_open(pvconn_name, &pvconn);
if (!retval || retval == EAGAIN) {
dp_add_pvconn(dp, pvconn);
n_listeners++;
} else {
ofp_error(retval, "opening %s", pvconn_name);
}
}
if (!n_listeners) {
ofp_fatal(0, "could not listen for any connections");
}

if (port_list) {
add_ports(dp, port_list);
}
if (local_port) {
error = dp_add_local_port(dp, local_port, 0);
if (error) {
ofp_fatal(error, "failed to add local port %s", local_port);
}
}

error = vlog_server_listen(NULL, NULL);
if (error) {
ofp_fatal(error, "could not listen for vlog connections");
}

如果相同程序以存在则停止此程序,运行守护进程:

1
2
die_if_already_running();
daemonize();

daemonize(void)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void
daemonize(void)
{
if (detach) { //detach是一个bool值,判断是否作为后台程序运行
char c = 0;
int fds[2];
if (pipe(fds) < 0) {
ofp_fatal(errno, "pipe failed");
}

switch (fork()) {
default:
/* Parent process: wait for child to create pidfile, then exit. */
close(fds[1]);
fatal_signal_fork();
if (read(fds[0], &c, 1) != 1) {
ofp_fatal(errno, "daemon child failed to signal startup");
}
exit(0);

case 0:
/* Child process. */
close(fds[0]);
make_pidfile(); //进程文件
write(fds[1], &c, 1); //读写操作
close(fds[1]);
setsid();
chdir("/");
break;

case -1:
/* Error. */
ofp_fatal(errno, "could not fork");
break;
}
} else {
make_pidfile();
}
}

接下来,主函数建立socket通道。前面提到该AP由click模式实现与控制器或上层 openflow 交换机的交互,click相当于一个交换数据的交换机,用不同端口将数据交换隔离开。openflow1.0.0 程序建立 socket 通道,将数据包以 udp 协议传输到click模块中。通道由 make_socket 函数建立。

1
2
3
4
/*add the monitor function*/
tp = realloc(tp, sizeof(struct thread_para));
tp->dp = dp;
make_socket(tp);

make_socket

thread_para 是一个包含端口port和datapath的结构体,具体定义为:

1
2
3
4
struct thread_para {
struct border_port *bp;
struct datapath *dp;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//server socket using UDP
int make_socket(struct thread_para *tp) {

printf("enter make socket function\n");
const int SERV_PORT = 5555;
pthread_t tid;
int sockfd;
struct sockaddr_in servaddr;

bzero(&servaddr , sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);

if((sockfd = socket(AF_INET , SOCK_DGRAM , 0)) < 0)
{
perror("socket error");
exit(1);
}
//_fd = sockfd;
struct border_port* bp;
bp = realloc(bp, sizeof(struct border_port));
tp->bp = bp;
tp->bp->fd = sockfd;

if(bind(sockfd , (struct sockaddr *)&servaddr , sizeof(servaddr)))
{
perror("bind error");
exit(1);
}

printf("create server socket\n");
//create thread to send and receive message from click module
if( pthread_create(&tid, NULL, socket_handler, tp) != 0 )
pdie("pthread_create");

return 0;
}

后台进程建立成功后,主函数中设计了一个死循环,运行 dp_run, dp_wait, poll_back 三个函数。

1
2
3
4
5
for (;;) {
dp_run(dp);
dp_wait(dp);
poll_block();
}

我们主要来看一下 dp_run 函数,其余的 dp_wait 和 poll_block 函数不做详解。

dp_run

此函数顾名思义就是运行一个datapath,其中主要重要的函数包括 netdev_recvfwd_port_inputremote_run ,将会在后面具体分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
void
dp_run(struct datapath *dp)
{
time_t now = time_now();
struct sw_port *p, *pn;
struct remote *r, *rn;
struct ofpbuf *buffer = NULL;
size_t i;

if (now != dp->last_timeout) {
struct list deleted = LIST_INITIALIZER(&deleted);
struct sw_flow *f, *n;

chain_timeout(dp->chain, &deleted);
LIST_FOR_EACH_SAFE (f, n, struct sw_flow, node, &deleted) {
dp_send_flow_end(dp, f, f->reason);
list_remove(&f->node);
flow_free(f);
}
dp->last_timeout = now;
}
poll_timer_wait(1000);

/*
* 以上对超时做了排错处理
* dp_send_flow_end 将错误代码发送到控制器,具体分析见下。
*/

LIST_FOR_EACH_SAFE (p, pn, struct sw_port, node, &dp->port_list) {
int error;

if (!buffer) {
/* Allocate buffer with some headroom to add headers in forwarding
* to the controller or adding a vlan tag, plus an extra 2 bytes to
* allow IP headers to be aligned on a 4-byte boundary. */
const int headroom = 128 + 2;
const int hard_header = VLAN_ETH_HEADER_LEN;
const int mtu = netdev_get_mtu(p->netdev);
buffer = ofpbuf_new(headroom + hard_header + mtu);
buffer->data = (char*)buffer->data + headroom;
}

/*
* buffer用来暂时存储netdevice的数据包
* ofbuf_new 定义了一个空的buffer空间
*/
error = netdev_recv(p->netdev, buffer);
if (!error) {
p->rx_packets++;
p->rx_bytes += buffer->size;
fwd_port_input(dp, buffer, p);
buffer = NULL;
} else if (error != EAGAIN) {
VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
netdev_get_name(p->netdev), strerror(error));
}
}
ofpbuf_delete(buffer);

/*
* 建立与Controller之间的安全信道
*/

/* Talk to remotes. */
LIST_FOR_EACH_SAFE (r, rn, struct remote, node, &dp->remotes) {
remote_run(dp, r);
}

for (i = 0; i < dp->n_listeners; ) {
struct pvconn *pvconn = dp->listeners[i];
struct vconn *new_vconn;
int retval = pvconn_accept(pvconn, OFP_VERSION, &new_vconn);
if (!retval) {
remote_create(dp, rconn_new_from_vconn("passive", new_vconn));
} else if (retval != EAGAIN) {
VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
dp->listeners[i] = dp->listeners[--dp->n_listeners];
continue;
}
i++;
}
}

ofpbuf_new 提供一个空的内存空间。

1
2
3
4
5
6
7
struct ofpbuf *
ofpbuf_new(size_t size)
{
struct ofpbuf *b = xmalloc(sizeof *b);
ofpbuf_init(b, size);
return b;
}

netdev_recv

netdev_recv 将网络设备接受到的数据包存进缓存空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
int
netdev_recv(struct netdev *netdev, struct ofpbuf *buffer)
{
ssize_t n_bytes;
struct sockaddr_ll sll;
socklen_t sll_len;

assert(buffer->size == 0);
assert(ofpbuf_tailroom(buffer) >= ETH_TOTAL_MIN);

/* prepare to call recvfrom */
memset(&sll,0,sizeof sll);
sll_len = sizeof sll;

/* cannot execute recvfrom over a tap device */
if (!strncmp(netdev->name, "tap", 3)) {
do {
n_bytes = read(netdev->tap_fd, ofpbuf_tail(buffer),
(ssize_t)ofpbuf_tailroom(buffer));
} while (n_bytes < 0 && errno == EINTR);
}
else {
do {
n_bytes = recvfrom(netdev->tap_fd, ofpbuf_tail(buffer),
(ssize_t)ofpbuf_tailroom(buffer), 0,
(struct sockaddr *)&sll, &sll_len);
} while (n_bytes < 0 && errno == EINTR);
}
if (n_bytes < 0) {
if (errno != EAGAIN) {
VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s",
strerror(errno), netdev->name);
}
return errno;
} else {
/* we have multiple raw sockets at the same interface, so we also
* receive what others send, and need to filter them out.
* TODO(yiannisy): can we install this as a BPF at kernel? */
if (sll.sll_pkttype == PACKET_OUTGOING) {
return EAGAIN;
}


buffer->size += n_bytes;

/* When the kernel internally sends out an Ethernet frame on an
* interface, it gives us a copy *before* padding the frame to the
* minimum length. Thus, when it sends out something like an ARP
* request, we see a too-short frame. So pad it out to the minimum
* length. */
pad_to_minimum_length(buffer);
return 0;
}
}

此函数利用 recvfrom 函数将网络设备收到的数据存储在 buffer 的 ofpbuf_tail 中。接受到数据包之后, fwd_port_input 函数首先对其处理。

fwd_port_input

1
2
3
4
5
6
7
8
9
10
11
/* 'buffer' was received on 'p', which may be a a physical switch port or a
* null pointer. Process it according to 'dp''s flow table, sending it up to
* the controller if no flow matches. Takes ownership of 'buffer'. */
void fwd_port_input(struct datapath *dp, struct ofpbuf *buffer,
struct sw_port *p)
{
if (run_flow_through_tables(dp, buffer, p)) {
dp_output_control(dp, buffer, p->port_no,
dp->miss_send_len, OFPR_NO_MATCH);
}
}

数据包首先由 run_flow_through_tables 判断是否需要输出至控制器,再由 dp_output_control 函数操作。

run_flow_through_tables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* 'buffer' was received on 'p', which may be a a physical switch port or a
* null pointer. Process it according to 'dp''s flow table. Returns 0 if
* successful, in which case 'buffer' is destroyed, or -ESRCH if there is no
* matching flow, in which case 'buffer' still belongs to the caller. */
int run_flow_through_tables(struct datapath *dp, struct ofpbuf *buffer,
struct sw_port *p)
{
struct sw_flow_key key;
struct sw_flow *flow;

key.wildcards = 0;
if (flow_extract(buffer, p ? p->port_no : OFPP_NONE, &key.flow)
&& (dp->flags & OFPC_FRAG_MASK) == OFPC_FRAG_DROP) {
/* Drop fragment. */
ofpbuf_delete(buffer);
return 0;
}

if (p && p->config & (OFPPC_NO_RECV | OFPPC_NO_RECV_STP)
&& p->config & (!eth_addr_equals(key.flow.dl_dst, stp_eth_addr)
? OFPPC_NO_RECV : OFPPC_NO_RECV_STP)) {
ofpbuf_delete(buffer);
return 0;
}

flow = chain_lookup(dp->chain, &key, 0);
if (flow != NULL) {
flow_used(flow, buffer);
execute_actions(dp, buffer, &key, flow->sf_acts->actions,
flow->sf_acts->actions_len, false);
return 0;
} else {
return -ESRCH;
}
}

首先,flow_extract 解析数据包,该函数主要判断数据包是否为IP数据包,是则返回1;否则返回0。如果是IP数据包,程序将删除这一缓存,不进行下一步处理。同时,函数将数据包中的各信息位信息存储到结构体 flow 中。flow 由 sw_flow 定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct sw_flow {
struct sw_flow_key key;

uint64_t cookie; /* Opaque controller-issued identifier. */
uint16_t priority; /* Only used on entries with wildcards. */
uint16_t idle_timeout; /* Idle time before discarding (seconds). */
uint16_t hard_timeout; /* Hard expiration time (seconds) */
uint64_t used; /* Last used time. */
uint64_t created; /* When the flow was created. */
uint64_t packet_count; /* Number of packets seen. */
uint64_t byte_count; /* Number of bytes seen. */
uint8_t reason; /* Reason flow removed (one of OFPRR_*). */
uint8_t send_flow_rem; /* Send a flow removed to the controller */
uint8_t emerg_flow; /* Emergency flow indicator */

struct sw_flow_actions *sf_acts;

/* Private to table implementations. */
struct list node;
struct list iter_node;
unsigned long int serial;
};

数据流解析函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/* Returns 1 if 'packet' is an IP fragment, 0 otherwise. */
int
flow_extract(struct ofpbuf *packet, uint16_t in_port, struct flow *flow)
{
struct ofpbuf b = *packet;
struct eth_header *eth;
int retval = 0;

memset(flow, 0, sizeof *flow);
flow->dl_vlan = htons(OFP_VLAN_NONE);
flow->in_port = htons(in_port);

packet->l2 = b.data;
packet->l3 = NULL;
packet->l4 = NULL;
packet->l7 = NULL;

eth = pull_eth(&b);
if (eth) {
if (ntohs(eth->eth_type) >= OFP_DL_TYPE_ETH2_CUTOFF) {
/* This is an Ethernet II frame */
flow->dl_type = eth->eth_type;
} else {
/* This is an 802.2 frame */
struct llc_header *llc = ofpbuf_at(&b, 0, sizeof *llc);
struct snap_header *snap = ofpbuf_at(&b, sizeof *llc,
sizeof *snap);
if (llc == NULL) {
return 0;
}
if (snap
&& llc->llc_dsap == LLC_DSAP_SNAP
&& llc->llc_ssap == LLC_SSAP_SNAP
&& llc->llc_cntl == LLC_CNTL_SNAP
&& !memcmp(snap->snap_org, SNAP_ORG_ETHERNET,
sizeof snap->snap_org)) {
flow->dl_type = snap->snap_type;
ofpbuf_pull(&b, LLC_SNAP_HEADER_LEN);
} else {
flow->dl_type = htons(OFP_DL_TYPE_NOT_ETH_TYPE);
ofpbuf_pull(&b, sizeof(struct llc_header));
}
}

/* Check for a VLAN tag */
if (flow->dl_type == htons(ETH_TYPE_VLAN)) {
struct vlan_header *vh = pull_vlan(&b);
if (vh) {
flow->dl_type = vh->vlan_next_type;
flow->dl_vlan = vh->vlan_tci & htons(VLAN_VID_MASK);
flow->dl_vlan_pcp = (uint8_t)((ntohs(vh->vlan_tci) >> VLAN_PCP_SHIFT)
& VLAN_PCP_BITMASK);
}
}
memcpy(flow->dl_src, eth->eth_src, ETH_ADDR_LEN);
memcpy(flow->dl_dst, eth->eth_dst, ETH_ADDR_LEN);

packet->l3 = b.data;
if (flow->dl_type == htons(ETH_TYPE_IP)) {
const struct ip_header *nh = pull_ip(&b);
if (nh) {
flow->nw_tos = nh->ip_tos & 0xfc;
flow->nw_proto = nh->ip_proto;
flow->nw_src = nh->ip_src;
flow->nw_dst = nh->ip_dst;
packet->l4 = b.data;
if (!IP_IS_FRAGMENT(nh->ip_frag_off)) {
if (flow->nw_proto == IP_TYPE_TCP) {
const struct tcp_header *tcp = pull_tcp(&b);
if (tcp) {
flow->tp_src = tcp->tcp_src;
flow->tp_dst = tcp->tcp_dst;
packet->l7 = b.data;
} else {
/* Avoid tricking other code into thinking that
* this packet has an L4 header. */
flow->nw_proto = 0;
}
} else if (flow->nw_proto == IP_TYPE_UDP) {
const struct udp_header *udp = pull_udp(&b);
if (udp) {
flow->tp_src = udp->udp_src;
flow->tp_dst = udp->udp_dst;
packet->l7 = b.data;
} else {
/* Avoid tricking other code into thinking that
* this packet has an L4 header. */
flow->nw_proto = 0;
}
} else if (flow->nw_proto == IP_TYPE_ICMP) {
const struct icmp_header *icmp = pull_icmp(&b);
if (icmp) {
flow->icmp_type = htons(icmp->icmp_type);
flow->icmp_code = htons(icmp->icmp_code);
packet->l7 = b.data;
} else {
/* Avoid tricking other code into thinking that
* this packet has an L4 header. */
flow->nw_proto = 0;
}
}
} else {
retval = 1;
}
}
} else if (flow->dl_type == htons(ETH_TYPE_ARP)) {
const struct arp_eth_header *arp = pull_arp(&b);
if (arp) {
if (arp->ar_pro == htons(ARP_PRO_IP) && arp->ar_pln == IP_ADDR_LEN) {
flow->nw_src = arp->ar_spa;
flow->nw_dst = arp->ar_tpa;
}
flow->nw_proto = ntohs(arp->ar_op) && 0xff;
}
}
}
return retval;
}

如果不是一个IP数据,数据包的信息现已存储在结构体 flow 中。

接着,通过 chain_lookup 函数为此数据包匹配对应的 key 并存入 flow 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* Searches 'chain' for a flow matching 'key', which must not have any wildcard
* fields. Returns the flow if successful, otherwise a null pointer. */
struct sw_flow *
chain_lookup(struct sw_chain *chain, const struct sw_flow_key *key, int emerg)
{
int i;

assert(!key->wildcards);

if (emerg) {
struct sw_table *t = chain->emerg_table;
struct sw_flow *flow = t->lookup(t, key);
t->n_lookup++;
if (flow) {
t->n_matched++;
return flow;
}
} else {
for (i = 0; i < chain->n_tables; i++) {
struct sw_table *t = chain->tables[i];
struct sw_flow *flow = t->lookup(t, key);
t->n_lookup++;
if (flow) {
t->n_matched++;
return flow;
}
}
}

return NULL;
}

最后, execute_actions 函数执行该flow的 action

execute_actions

此函数需要的参数包括 datapath,buffer,key,acitons,actions_len。

由函数的调用可以看到,actions 和 actions_len 存储在flow的 sw_flow_actions 结构体中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* Execute a list of actions against 'buffer'. */
void execute_actions(struct datapath *dp, struct ofpbuf *buffer,
struct sw_flow_key *key,
const struct ofp_action_header *actions, size_t actions_len,
int ignore_no_fwd)
{
/* Every output action needs a separate clone of 'buffer', but the common
* case is just a single output action, so that doing a clone and then
* freeing the original buffer is wasteful. So the following code is
* slightly obscure just to avoid that. */
int prev_port;
uint32_t prev_queue;
size_t max_len = UINT16_MAX;
uint16_t in_port = ntohs(key->flow.in_port);
uint8_t *p = (uint8_t *)actions;

prev_port = -1;
prev_queue = 0;

/* The action list was already validated, so we can be a bit looser
* in our sanity-checking. */
while (actions_len > 0) {
struct ofp_action_header *ah = (struct ofp_action_header *)p;
size_t len = htons(ah->len);

if (prev_port != -1) {
do_output(dp, ofpbuf_clone(buffer), in_port, max_len,
prev_port, prev_queue, ignore_no_fwd);
prev_port = -1;
}

if (ah->type == htons(OFPAT_OUTPUT)) {
struct ofp_action_output *oa = (struct ofp_action_output *)p;
prev_port = ntohs(oa->port);
prev_queue = 0; /* using the default best-effort queue */
max_len = ntohs(oa->max_len);
} else if (ah->type == htons(OFPAT_ENQUEUE)) {
struct ofp_action_enqueue *ea = (struct ofp_action_enqueue *)p;
prev_port = ntohs(ea->port);
prev_queue = ntohl(ea->queue_id);
max_len = 0; /* we will not send to the controller anyways - useless */
} else {
uint16_t type = ntohs(ah->type);

if (type < ARRAY_SIZE(of_actions)) {
execute_ofpat(buffer, key, ah, type);
} else if (type == OFPAT_VENDOR) {
execute_vendor(buffer, key, ah);
}
}

p += len;
actions_len -= len;
}
if (prev_port != -1) {
do_output(dp, buffer, in_port, max_len, prev_port, prev_queue, ignore_no_fwd);
} else {
ofpbuf_delete(buffer);
}
}

此函数确定转发前后的端口,对 OpenFlow 协议的 build-in action 和 vendor aciton 分别执行 execute_ofpactexecute_vendor ,其他 actions 统一由 do_output 执行。

do_output

函数首先根据输出端口判断该数据包是转发给控制器的还是由端口输出的。

1
2
3
4
5
6
7
8
9
10
11
static void
do_output(struct datapath *dp, struct ofpbuf *buffer, int in_port,
size_t max_len, int out_port, uint32_t queue_id,
bool ignore_no_fwd)
{
if (out_port != OFPP_CONTROLLER) {
dp_output_port(dp, buffer, in_port, out_port, queue_id, ignore_no_fwd);
} else {
dp_output_control(dp, buffer, in_port, max_len, OFPR_ACTION);
}
}
do_output_port

根据不同的 out_port 对数据包进行处理,涉及的处理函数有 output_packetoutput_alldp_output_control

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/** Takes ownership of 'buffer' and transmits it to 'out_port' on 'dp'.
*/
void
dp_output_port(struct datapath *dp, struct ofpbuf *buffer,
int in_port, int out_port, uint32_t queue_id,
bool ignore_no_fwd UNUSED)
{

assert(buffer);
switch (out_port) {
case OFPP_IN_PORT:
output_packet(dp, buffer, in_port, queue_id);
break;

case OFPP_TABLE: {
struct sw_port *p = dp_lookup_port(dp, in_port);
if (run_flow_through_tables(dp, buffer, p)) {
ofpbuf_delete(buffer);
}
break;
}

case OFPP_FLOOD:
output_all(dp, buffer, in_port, 1);
break;

case OFPP_ALL:
output_all(dp, buffer, in_port, 0);
break;

case OFPP_CONTROLLER:
dp_output_control(dp, buffer, in_port, UINT16_MAX, OFPR_ACTION);
break;

case OFPP_LOCAL:
default:
if (in_port == out_port) {
VLOG_DBG_RL(&rl, "can't directly forward to input port");
return;
}
output_packet(dp, buffer, out_port, queue_id);
break;
}
}
  • output_packet 函数利用 netdev_send 函数将数据包由网络设备发出。
  • output_all 函数将数据包从所有端口发出,在函数内调用 do_output_port ,每次发送后将端口号加1继续发送。
do_output_control
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* Takes ownership of 'buffer' and transmits it to 'dp''s controller.  If the
* packet can be saved in a buffer, then only the first max_len bytes of
* 'buffer' are sent; otherwise, all of 'buffer' is sent. 'reason' indicates
* why 'buffer' is being sent. 'max_len' sets the maximum number of bytes that
* the caller wants to be sent. */
void
dp_output_control(struct datapath *dp, struct ofpbuf *buffer, int in_port,
size_t max_len, int reason)
{
struct ofp_packet_in *opi;
size_t total_len;
uint32_t buffer_id;

buffer_id = save_buffer(buffer);
total_len = buffer->size;
if (buffer_id != UINT32_MAX && buffer->size > max_len) {
buffer->size = max_len;
}

opi = ofpbuf_push_uninit(buffer, offsetof(struct ofp_packet_in, data));
opi->header.version = OFP_VERSION;
opi->header.type = OFPT_PACKET_IN;
opi->header.length = htons(buffer->size);
opi->header.xid = htonl(0);
opi->buffer_id = htonl(buffer_id);
opi->total_len = htons(total_len);
opi->in_port = htons(in_port);
opi->reason = reason;
opi->pad = 0;
send_openflow_buffer(dp, buffer, NULL);
}

函数利用 ofpbuf_push_uninit 函数重构 buffer 的数据包头部,然后用 send_openflow_buffer 函数将 buffer 发出。

1
2
3
4
5
6
7
8
void *
ofpbuf_push_uninit(struct ofpbuf *b, size_t size)
{
ofpbuf_prealloc_headroom(b, size);
b->data = (char*)b->data - size;
b->size += size;
return b->data;
}

在解析 send_openflow_buffer 函数之前,先来看一下之前提到的 remote_run 函数。

remote_run

此函数主要通过构建虚拟连接来建立一个远程进程,我的个人理解是通过这个函数来建立AP与Controller之间的安全信道。

首先,结构体 remote 构建了一个安全信道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* A connection to a secure channel. */
struct remote {
struct list node;
struct rconn *rconn;
#define TXQ_LIMIT 128 /* Max number of packets to queue for tx. */
int n_txq; /* Number of packets queued for tx on rconn. */

/* Support for reliable, multi-message replies to requests.
*
* If an incoming request needs to have a reliable reply that might
* require multiple messages, it can use remote_start_dump() to set up
* a callback that will be called as buffer space for replies. */
int (*cb_dump)(struct datapath *, void *aux);
void (*cb_done)(void *aux);
void *cb_aux;
};

结构体 rconn 被定义为一个连接控制器或交换机的可靠连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/* A reliable connection to an OpenFlow switch or controller.
*
* See the large comment in rconn.h for more information. */
struct rconn {
enum state state;
time_t state_entered;

struct vconn *vconn;
char *name;
bool reliable;

struct ofp_queue txq;

int backoff;
int max_backoff;
time_t backoff_deadline;
time_t last_received;
time_t last_connected;
unsigned int packets_sent;
unsigned int seqno;

/* In S_ACTIVE and S_IDLE, probably_admitted reports whether we believe
* that the peer has made a (positive) admission control decision on our
* connection. If we have not yet been (probably) admitted, then the
* connection does not reset the timer used for deciding whether the switch
* should go into fail-open mode.
*
* last_admitted reports the last time we believe such a positive admission
* control decision was made. */
bool probably_admitted;
time_t last_admitted;

/* These values are simply for statistics reporting, not used directly by
* anything internal to the rconn (or the secchan for that matter). */
unsigned int packets_received;
unsigned int n_attempted_connections, n_successful_connections;
time_t creation_time;
unsigned long int total_time_connected;

/* If we can't connect to the peer, it could be for any number of reasons.
* Usually, one would assume it is because the peer is not running or
* because the network is partitioned. But it could also be because the
* network topology has changed, in which case the upper layer will need to
* reassess it (in particular, obtain a new IP address via DHCP and find
* the new location of the controller). We set this flag when we suspect
* that this could be the case. */
bool questionable_connectivity;
time_t last_questioned;

/* Throughout this file, "probe" is shorthand for "inactivity probe".
* When nothing has been received from the peer for a while, we send out
* an echo request as an inactivity probe packet. We should receive back
* a response. */
int probe_interval; /* Secs of inactivity before sending probe. */

/* Messages sent or received are copied to the monitor connections. */
#define MAX_MONITORS 8
struct vconn *monitors[8];
size_t n_monitors;

/* Protocol statistical informaition. */
struct ofpstat ofps_rcvd;
struct ofpstat ofps_sent;

uint32_t idle_echo_xid;
};

这是一个虚拟的连接,具体连接又由 vconn-provider 提供,包括状态、版本、IP以及接收和发送的结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Active virtual connection to an OpenFlow device.
*
* This structure should be treated as opaque by vconn implementations. */
struct vconn {
struct vconn_class *class;
int state;
int error;
int min_version;
int version;
uint32_t ip;
char *name;
bool reconnectable;
struct ofpstat ofps_rcvd;
struct ofpstat ofps_sent;
};

所以,总而言之可以把 remote 视为AP的安全信道。

在解析remote_run函数前,还要说明一个结构体:sender。sender由remote结构体和一个id组成,用以表示接收到的 OpenFlow 消息。

1
2
3
4
5
/* The origin of a received OpenFlow message, to enable sending a reply. */
struct sender {
struct remote *remote; /* The device that sent the message. */
uint32_t xid; /* The OpenFlow transaction ID. */
};

remote_run 函数主要职责是保持安全信道运行并捕获由controller发来的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
remote_run(struct datapath *dp, struct remote *r)
{
int i;

rconn_run(r->rconn);

/* Do some remote processing, but cap it at a reasonable amount so that
* other processing doesn't starve. */
for (i = 0; i < 50; i++) {
if (!r->cb_dump) {
struct ofpbuf *buffer;
struct ofp_header *oh;

buffer = rconn_recv(r->rconn);
if (!buffer) {
break;
}

if (buffer->size >= sizeof *oh) {
struct sender sender;

oh = (struct ofp_header *)buffer->data;
sender.remote = r;
sender.xid = oh->xid;
fwd_control_input(dp, &sender, buffer->data, buffer->size);
} else {
VLOG_WARN_RL(&rl, "received too-short OpenFlow message");
}
ofpbuf_delete(buffer);
} else {
if (r->n_txq < TXQ_LIMIT) {
int error = r->cb_dump(dp, r->cb_aux);
if (error <= 0) {
if (error) {
VLOG_WARN_RL(&rl, "dump callback error: %s",
strerror(-error));
}
r->cb_done(r->cb_aux);
r->cb_dump = NULL;
}
} else {
break;
}
}
}

if (!rconn_is_alive(r->rconn)) {
remote_destroy(r);
}
}
  • rconn_run 函数建立连接。
  • rconn_recv 接受连接中的数据包,并将数据包存储在 buffer 中。
  • fwd_control_input 解析接收到的数据包并进行下一步操作。

fwd_control_input

  • 从数据包的包头中提取数据,根据不同数据类型进行操作。
  • 操作函数暂存在 handler 中。
  • 具体的操作函数将在后续文章中逐个介绍。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/* 'msg', which is 'length' bytes long, was received from the control path.
* Apply it to 'chain'. */
int
fwd_control_input(struct datapath *dp, const struct sender *sender,
const void *msg, size_t length)
{
int (*handler)(struct datapath *, const struct sender *, const void *);
struct ofp_header *oh;
size_t min_size;

/* Check encapsulated length. */
oh = (struct ofp_header *) msg;
if (ntohs(oh->length) > length) {
return -EINVAL;
}
assert(oh->version == OFP_VERSION);

/* Figure out how to handle it. */
switch (oh->type) {
case OFPT_BARRIER_REQUEST:
min_size = sizeof(struct ofp_header);
handler = recv_barrier_request;
break;
case OFPT_FEATURES_REQUEST:
min_size = sizeof(struct ofp_header);
handler = recv_features_request;
break;
case OFPT_GET_CONFIG_REQUEST:
min_size = sizeof(struct ofp_header);
handler = recv_get_config_request;
break;
case OFPT_SET_CONFIG:
min_size = sizeof(struct ofp_switch_config);
handler = recv_set_config;
break;
case OFPT_PACKET_OUT:
min_size = sizeof(struct ofp_packet_out);
handler = recv_packet_out;
break;
case OFPT_FLOW_MOD:
min_size = sizeof(struct ofp_flow_mod);
handler = recv_flow;
break;
case OFPT_PORT_MOD:
min_size = sizeof(struct ofp_port_mod);
handler = recv_port_mod;
break;
case OFPT_STATS_REQUEST:
min_size = sizeof(struct ofp_stats_request);
handler = recv_stats_request;
break;
case OFPT_ECHO_REQUEST:
min_size = sizeof(struct ofp_header);
handler = recv_echo_request;
break;
case OFPT_ECHO_REPLY:
min_size = sizeof(struct ofp_header);
handler = recv_echo_reply;
break;
case OFPT_QUEUE_GET_CONFIG_REQUEST:
min_size = sizeof(struct ofp_header);
handler = recv_queue_get_config_request;
break;
case OFPT_VENDOR:
min_size = sizeof(struct ofp_vendor_header);
handler = recv_vendor;
break;
default:
dp_send_error_msg(dp, sender, OFPET_BAD_REQUEST, OFPBRC_BAD_TYPE,
msg, length);
return -EINVAL;
}

/* Handle it. */
if (length < min_size)
return -EFAULT;
return handler(dp, sender, msg);
}

最后回到非常重要的函数 send_openflow_buffer ,此函数也在之前多次看到,现在我们来看一下它的具体代码。

send_openflow_buffer

  • 函数首先判断是否为控制器发送来的数据,如果是,则sender不为零,执行第一个if操作,发送回去。
  • 如果不是,解析目的端口,然后用 send_openflow_buffer_to_remote 把 buffer 发送到安全信道。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int
send_openflow_buffer(struct datapath *dp, struct ofpbuf *buffer,
const struct sender *sender)
{
update_openflow_length(buffer);
if (sender) {
/* Send back to the sender. */
return send_openflow_buffer_to_remote(buffer, sender->remote);
} else {
/* Broadcast to all remotes. */
struct remote *r, *prev = NULL;
LIST_FOR_EACH (r, struct remote, node, &dp->remotes) {
if (prev) {
//send_openflow_buffer_to_remote(ofpbuf_clone(buffer), prev);
send_openflow_buffer_to_remote(buffer, prev);
}
prev = r;
}
if (prev) {
send_openflow_buffer_to_remote(buffer, prev);
} else {
ofpbuf_delete(buffer);
}
return 0;
}
}
send_openflow_buffer_to_remote
1
2
3
4
5
6
7
8
9
10
11
static int
send_openflow_buffer_to_remote(struct ofpbuf *buffer, struct remote *remote)
{
int retval = rconn_send_with_limit(remote->rconn, buffer, &remote->n_txq,
TXQ_LIMIT);
if (retval) {
VLOG_WARN_RL(&rl, "send to %s failed: %s",
rconn_get_name(remote->rconn), strerror(retval));
}
return retval;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* Sends 'b' on 'rc'.  Increments '*n_queued' while the packet is in flight; it
* will be decremented when it has been sent (or discarded due to
* disconnection). Returns 0 if successful, EAGAIN if '*n_queued' is already
* at least as large as 'queue_limit', or ENOTCONN if 'rc' is not currently
* connected. Regardless of return value, 'b' is destroyed.
*
* Because 'b' may be sent (or discarded) before this function returns, the
* caller may not be able to observe any change in '*n_queued'.
*
* There is no rconn_send_wait() function: an rconn has a send queue that it
* takes care of sending if you call rconn_run(), which will have the side
* effect of waking up poll_block(). */
int
rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
int *n_queued, int queue_limit)
{
int retval;
retval = *n_queued >= queue_limit ? EAGAIN : rconn_send(rc, b, n_queued);
if (retval) {
ofpbuf_delete(b);
}
return retval;
}

这几段代码比较简单,不过多说明,总之就是通过 rconn_send 发送到安全信道中。

因为是主函数直接调用的函数,总体还是框架性的函数。其他功能性函数将在接下里的文章中介绍。