- 设置起一个模拟的网络环境,包括启动TUN虚拟网络接口,设置路由表等。
- 启动多个线程来读取和写入TUN接口,来模拟网络数据包的传输。
- 与物理仿真和网络仿真程序通信,使用Protobuf协议来交换时间步进和网络数据包信息。
- 统一管理时间步进,同步各个线程。
- 缓存和转发网络数据包,提供延迟和错误等网络效应。
- 支持驱动程序与仿真程序的通信,转发控制请求和响应。
总的来说,它实现了一个在用户空间模拟网络的框架,将物理层和网络层的仿真程序连接起来,模拟了完整的物理-MAC-网络栈。它可以支持多节点多通道的协调模拟。通过它,可以方便地在没有实际硬件的情况下,测试和开发涉及物理层和网络层交互的算法、协议等,比如MAC协议、routing等。它抽象了实际硬件,提供了一个软件定义的网络环境。
1.关于NetworkCoordinator类的介绍
1.1 头文件和NetworkCoordinator的初始化
import gzip # 用于处理gzip压缩文件
import ipaddress # 用于处理IP地址
import json # 用于处理JSON数据
import multiprocessing # 用于多进程编程
import os # 用于操作文件和目录
import socket # 用于网络通信
import struct # 用于处理二进制数据和字节序列
import subprocess # 用于在Python中执行外部命令
import sys # 提供对Python运行时环境的访问和控制
import threading # 用于多线程编程
import time # 用于时间相关操作
import array # 用于处理数组
import fcntl # 提供对文件描述符的控制
import numpy as np # 用于处理数值和数组
import select # 用于实现I/O多路复用
import yaml # 用于读取和写入YAML文件
import protobuf_msgs.physics_update_pb2 as phyud # 用于处理物理更新的协议缓冲区消息
import protobuf_msgs.network_update_pb2 as netud # 用于处理网络更新的协议缓冲区消息
class NetworkCoordinator:
"""
网络协调器类,用于协调网络通信的功能
"""
def __init__(self, config_file):
"""
类的构造函数,接受一个配置文件作为参数
"""
# event used to indicate whether threads are allowed to run
self.run_event = multiprocessing.Event() # 多进程事件对象,用于指示线程是否可以运行
# barriers used to sync the protobuf threads at the beginning and end of their loops
self.protobuf_sync_barrier_top = threading.Barrier(2, timeout=5) # 用于同步protobuf线程在循环开始时的线程屏障对象
self.protobuf_sync_barrier_bottom = threading.Barrier(2, timeout=10) # 用于同步protobuf线程在循环结束时的线程屏障对象
# global "time"
self.time_counter = AtomicCounter() # 全局时间的原子计数器对象
# global packet ID
self.packet_id = AtomicCounter() # 全局数据包ID的原子计数器对象
# incoming packets will be stored in this dict until their data is sent to the network simulator
self.incoming_packet_buffer = {} # 接收到的数据包将存储在此字典中,直到其数据被发送到网络模拟器
# to get TUN threads to wait while the buffer is consumed
self.incoming_packet_buffer_busy = threading.Lock() # 用于在缓冲区被使用时阻塞TUN线程的锁对象
# packets will be stored in this dict until the network simulator asks for them to be sent to destination
self.outgoing_packet_buffer = {} # 数据包将存储在此字典中,直到网络模拟器要求将其发送到目标地址
# stores ids of packets for each interval, and these are cleared as they time out
self.dispatch_record = {} # 存储每个时间间隔的数据包ID,并在超时时清除它们
# node & channel information
self.channel_data = None # 节点和通道信息
with open(config_file) as f:
self.config = yaml.load(f, Loader=yaml.FullLoader) # 加载配置文件数据
self.phy_use_uds = self.config['phy_use_uds'] # 从配置数据中提取phy_use_uds的值
self.net_use_uds = self.config['net_use_uds'] # 从配置数据中提取net_use_uds的值
1.2 网络设置
这段代码的主要功能是通过调用系统命令设置网络环境。它遍历配置文件中的IP地址列表,并执行一系列命令来创建和配置TUN设备、添加路由表规则和本地路由规则。最后,它返回配置文件中的IP地址列表。
请注意,这段代码使用了subprocess.call来执行系统命令,而不是在Python中直接实现相应的功能。因此,这些命令的具体功能和效果可以参考相应的系统命令文档。
def _setup_network(self):
"""
设置网络环境
"""
for i, ip_i in enumerate(self.config['ip_list']):
# 添加TUN设备
subprocess.call(["sudo", "ip", "tuntap", "add", "dev", "tun" + str(i), "mode", "tun"])
# 启用TUN设备
subprocess.call(["sudo", "ip", "link", "set", "tun" + str(i), "up"])
# 添加IP地址到TUN设备
subprocess.call(["sudo", "ip", "addr", "add", ip_i + "/32", "dev", "tun" + str(i)])
for j, ip_j in enumerate(self.config['ip_list']):
if j != i:
# 添加路由表规则,指定目标地址经由相应的TUN设备发送
subprocess.call(["sudo", "ip", "route", "add", ip_j + "/32", "dev", "tun" + str(i),
"src", ip_i, "table", str(i + 1)])
# 添加策略路由规则,指定源地址经由相应的TUN设备发送
subprocess.call(["sudo", "ip", "rule", "add", "table", str(i + 1), "from", ip_i, "priority", "2"])
# 添加条件本地路由规则
subprocess.call(["sudo", "ip", "rule", "add", "iif", "tun" + str(i), "table", str(i + 101), "priority", "1"])
# 添加本地路由表规则,指定本地地址经由相应的TUN设备发送
subprocess.call(["sudo", "ip", "route", "add", "local", ip_i, "dev", "tun" + str(i), "table", str(i + 101)])
# 删除默认的本地路由规则
subprocess.call(["sudo", "ip", "rule", "del", "pref", "0", "from", "all", "lookup", "local"])
# 添加新的本地路由规则
subprocess.call(["sudo", "ip", "rule", "add", "pref", "10", "from", "all", "lookup", "local"])
return self.config['ip_list']
1.3 网络设置移除
这段代码的功能是通过调用系统命令来移除之前设置的网络环境。它接受一个参数 num_ips,表示要移除的TUN设备的数量。然后,它使用循环依次删除每个TUN设备、路由表规则和条件本地路由规则。最后,它添加默认的本地路由规则并删除先前添加的本地路由规则。
请注意,与前面的代码段一样,这里也使用了 subprocess.call 来执行系统命令。具体命令的功能和效果可以参考相应的系统命令文档。
@staticmethod
def _remove_network(num_ips):
"""
移除网络设置
"""
for i in range(num_ips):
# 删除TUN设备
subprocess.call(["sudo", "ip", "tuntap", "del", "dev", "tun" + str(i), "mode", "tun"])
# 删除路由表规则
subprocess.call(["sudo", "ip", "rule", "del", "table", str(i + 1)])
subprocess.call(["sudo", "ip", "rule", "del", "table", str(i + 101)])
# 添加默认的本地路由规则
subprocess.call(["sudo", "ip", "rule", "add", "pref", "0", "from", "all", "lookup", "local"])
# 删除之前添加的本地路由规则
subprocess.call(["sudo", "ip", "rule", "del", "pref", "10", "from", "all", "lookup", "local"])
1.4 读取TUN设备数据
这段代码通过循环从指定的TUN设备中读取数据。它使用 select.select 函数来等待TUN设备就绪,并使用 os.read 函数读取数据。读取的数据会进行处理,提取其中的源IP地址和目标IP地址,并将数据保存到输入数据包缓冲区中。在保存数据之前,会对数据进行版本判断,只处理IPv4的数据包。最后,会打印一些调试信息和对应的TUN设备的退出消息,并关闭TUN设备。
请注意,这段代码中涉及到的一些变量和函数,如 self.run_event,self.config,self.incoming_packet_buffer,self.packet_id,以及其他未提供的代码片段,需要根据上下文来确定其具体含义和实现方式。
def _read_from_tuns(self, i, tuns):
"""
从TUN设备中读取数据
"""
while self.run_event.is_set():
# 从TUN设备中读取数据
try:
r, __, __ = select.select([tuns[i].fileno(), ], [], [], self.config['responsiveness_timeout'])
if r:
data = os.read(tuns[i].fileno(), 4096)
else:
continue
except OSError:
self.run_event.clear() # 结束所有线程
print("TUN " + str(i) + " 似乎已经断开连接,无法从中读取数据")
break
# 识别IP地址并保存到缓冲区中
version = data[0]
version = version >> 4
if version == 4:
ip_src = int.from_bytes(data[12:16], byteorder="big")
ip_dst = int.from_bytes(data[16:20], byteorder="big")
with self.incoming_packet_buffer_busy:
self.incoming_packet_buffer[(self.packet_id.value, ip_src, ip_dst)] = data
if self.config['print_debug']: print(f"从TUN {i} 读取 {(self.packet_id.value, ip_src, ip_dst)}")
self.packet_id.increment()
print("TUN " + str(i) + " 退出")
tuns[i].close()
1.5 物理协调器通信
这段代码通过Protobuf客户端与物理协调器进行通信。它根据配置选择使用UNIX域套接字(AF_UNIX)还是IP套接字(AF_INET)。然后,它尝试连接到指定的服务器地址。在一个循环中,它根据条件发送时间更新请求,等待响应,并解析收到的数据。根据消息的类型,它执行相应的操作,如更新通道数据和递增时间计数器。如果发生错误或超时,会打印错误信息并结束所有线程。最后,它关闭套接字并终止底部Protobuf同步屏障。
请注意,这段代码中涉及到的一些变量和函数,如 self.run_event,self.config,self.protobuf_sync_barrier_top,self.protobuf_sync_barrier_bottom,self.time_counter,self.send_one_message,self.recv_one_message,self.channel_data,phyud.PhysicsUpdate,以及其他未提供的代码片段,需要根据上下文来确定其具体含义和实现方式。
def _run_protobuf_client_phy_coord(self):
"""
运行物理协调器的Protobuf客户端
"""
# 连接到服务器的套接字端口
if self.phy_use_uds:
# 创建一个UNIX域套接字
server_address = self.config['phy_uds_server_address']
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
server_address = (self.config['phy_ip_server_address'], self.config['phy_ip_server_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
try:
if self.try_connecting(sock, server_address, "Error (physics coordinator protobuf client):") == -1:
self.run_event.clear()
return
waiting_for_update = False
while self.run_event.is_set():
if not waiting_for_update:
self.protobuf_sync_barrier_top.wait()
# 发送时间更新请求
time_update = phyud.PhysicsUpdate()
time_update.time_val = self.time_counter.value
time_update.msg_type = phyud.PhysicsUpdate.BEGIN
self.send_one_message(sock, gzip.compress(time_update.SerializeToString()))
waiting_for_update = True
# 等待完成
r, __, __ = select.select([sock, ], [], [], self.config['responsiveness_timeout'])
if r:
# 获取响应
data = self.recv_one_message(sock)
if not data: continue
waiting_for_update = False
# 解析时间更新消息
time_update = phyud.PhysicsUpdate()
time_update.ParseFromString(gzip.decompress(data))
if time_update.msg_type != phyud.PhysicsUpdate.END:
raise ValueError("Network Coordinator got non-END message from Physics Coordinator!")
self.channel_data = time_update.channel_data if time_update.channel_data else b""
i = self.protobuf_sync_barrier_bottom.wait()
if i == 0: # 只有一个线程会执行这个
self.time_counter.increment()
else:
continue
except socket.error as msg:
print("Error (physics coordinator protobuf client): %s" % (msg,))
self.run_event.clear() # 结束所有线程
except threading.BrokenBarrierError:
print("Physics coordinator client: Timeout/Abort while waiting for network simulator client")
self.run_event.clear() # 结束所有线程
finally:
print('Closing protobuf client socket (physics coordinator )', file=sys.stderr)
self.protobuf_sync_barrier_bottom.abort()
sock.close()
1.6 网络协调器通信
这段代码的功能是通过Protobuf与网络模拟器进行通信。它执行以下操作:
- 根据配置选择使用UNIX域套接字(UDS)还是IP套接字来创建套接字对象。
- 尝试连接到网络模拟器服务器。
- 在循环中,发送时间更新请求以获取模拟器的状态。
- 等待模拟器的更新响应。
- 处理接收到的更新数据。
- 根据更新数据,将数据包写入相应的TUN接口。
- 清理过时的数据包和记录。
- 最后关闭套接字连接。
请注意,这段代码缺少一些定义,如self.protobuf_sync_barrier_top、self.time_counter、self.incoming_packet_buffer等。因此,要完整运行代码,还需要查看其他部分的实现。
def _run_protobuf_client_net_sim(self, tuns, ip_to_tun_map):
"""
运行网络模拟器的Protobuf客户端
"""
# 连接到服务器的套接字端口
if self.net_use_uds:
# 创建一个UNIX域套接字
server_address = self.config['netsim_uds_server_address']
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
server_address = (self.config['netsim_ip_server_address'], self.config['netsim_ip_server_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
if self.try_connecting(sock, server_address, "Error (network simulator protobuf client):") == -1:
self.run_event.clear()
return
waiting_for_update = False
while self.run_event.is_set():
if not waiting_for_update:
# 发送时间更新请求
self.protobuf_sync_barrier_top.wait()
current_time = self.time_counter.value
time_update = netud.NetworkUpdate()
time_update.time_val = current_time
time_update.msg_type = netud.NetworkUpdate.BEGIN
# 告诉TUN线程等待缓冲区清空
with self.incoming_packet_buffer_busy:
temp_packet_buffer = self.incoming_packet_buffer
self.incoming_packet_buffer = {}
if temp_packet_buffer:
self.dispatch_record[current_time] = []
for key_tuple, pkt_data in temp_packet_buffer.items():
time_update.pkt_id.append(key_tuple[0])
time_update.src_ip.append(key_tuple[1])
time_update.dst_ip.append(key_tuple[2])
time_update.pkt_lengths.append(len(pkt_data))
self.dispatch_record[current_time].append(key_tuple)
self.outgoing_packet_buffer.update(temp_packet_buffer)
if self.channel_data:
time_update.channel_data = self.channel_data
self.send_one_message(sock, gzip.compress(time_update.SerializeToString()))
waiting_for_update = True
# 等待更新
r, __, __ = select.select([sock, ], [], [], self.config['responsiveness_timeout'])
if r:
data = self.recv_one_message(sock)
if not data: continue
waiting_for_update = False
time_update = netud.NetworkUpdate()
time_update.ParseFromString(gzip.decompress(data))
for time_update_tuple in zip(time_update.pkt_id, time_update.src_ip, time_update.dst_ip,
time_update.ber, time_update.rx_ip):
if not self.run_event.is_set():
print("Network simulator was asked to exit while writing data")
break
ber = time_update_tuple[3]
key_tuple = time_update_tuple[:3]
if key_tuple not in self.outgoing_packet_buffer:
# 数据包已经丢失,可能是由于存储超时导致的
print(f"Packet {key_tuple} to be delivered was already dropped")
continue
data = self.outgoing_packet_buffer[key_tuple]
if key_tuple[2] == int(self.config['broadcast_address']):
# 将UDP广播(255.255.255.255)转换为单播。
data = bytearray(data)
data[16:20] = time_update_tuple[4].to_bytes(4, byteorder="big") # 设置IP
data[10:12] = b'\x00\x00' # 清除校验和
data[10:12] = self.generate_ipv4_checksum(data[:20]).to_bytes(2, byteorder="big")
data[26:28] = b'\x00\x00' # 清除UDP校验和
if ber == 0:
pass
elif 0 < ber < 1:
if not isinstance(data, bytearray): data = bytearray(data)
data = self._apply_ber(ber, data)
if not data: continue
else:
continue
try:
os.write(tuns[ip_to_tun_map[time_update_tuple[4]]].fileno(), data)
if self.config['print_debug']: print(f"Wrote {key_tuple} to TUN {ip_to_tun_map[time_update_tuple[4]]}")
except (OSError, KeyError, ValueError) as msg:
#print("Network simulator client: Error while writing to TUN. %s" % (msg,))
continue
for key_tuple in zip(time_update.clear_pkt_id, time_update.clear_src_ip, time_update.clear_dst_ip):
if key_tuple in self.outgoing_packet_buffer:
del self.outgoing_packet_buffer[key_tuple]
time_to_clear = current_time - self.config['packet_holding_duration']
if time_to_clear in self.dispatch_record:
for key_tuple in self.dispatch_record[time_to_clear]:
if key_tuple in self.outgoing_packet_buffer:
del self.outgoing_packet_buffer[key_tuple]
del self.dispatch_record[time_to_clear]
if time_update.msg_type != netud.NetworkUpdate.END:
raise ValueError("Network Coordinator got non-END message from Network Simulator!")
i = self.protobuf_sync_barrier_bottom.wait()
if i == 0: # 只有一个线程会执行这段代码
self.time_counter.increment()
except socket.error as msg:
print("Error (network simulator protobuf client): %s" % (msg,))
self.run_event.clear() # 结束所有线程
except threading.BrokenBarrierError:
print("Network simulator client: Timeout/Abort while waiting for physics coordinator client")
self.run_event.clear() # 结束所有线程
finally:
self.protobuf_sync_barrier_bottom.abort()
print('Closing protobuf client socket (network simulator)', file=sys.stderr)
sock.close()
1.7 IPv4头部的校验和
这段代码的功能是计算IPv4头部的校验和。IPv4头部包含多个字段,其中一个字段就是校验和。校验和用于验证IP头部在传输过程中是否出现了错误或损坏。该方法执行以下操作:
检查IP头部的长度是否为奇数,如果是则在末尾添加一个空字节,以确保字节数组的长度为偶数。 将IP头部按照16位的字节数组进行分组,并计算它们的总和。 将求和结果的高16位与低16位相加,以便将溢出的位添加到总和中。 将相加后的结果的高16位与低16位再次相加,以确保校验和的位数不超过16位。 对结果取反,得到校验和的补码形式。 假设使用小端字节序,将校验和的字节顺序进行调整,确保正确的字节顺序。 返回最终的校验和值。 这段代码的目的是计算IPv4头部的校验和,以便在网络通信中进行数据完整性的验证。
@staticmethod
def generate_ipv4_checksum(ip_header):
"""
生成IPv4头部的校验和
"""
# 如果IP头部的长度是奇数,则在末尾添加一个空字节
if len(ip_header) % 2 == 1:
ip_header += "\0"
# 将IP头部按照16位的字节数组进行分组求和
checksum = sum(array.array("H", ip_header))
# 将求和结果的高16位与低16位相加
checksum = (checksum >> 16) + (checksum & 0xffff)
# 将相加后的结果的高16位与低16位再次相加
checksum += (checksum >> 16)
# 取反得到校验和的补码形式
checksum = ~checksum
# 假设使用小端字节序,将校验和字节顺序进行调整
return (((checksum >> 8) & 0xff) | checksum << 8) & 0xffff
1.8 比特错误率应用
这段代码的功能是将比特错误率(BER)应用到给定的数据上。方法执行以下操作:
- 根据比特错误率(BER)和数据的长度,使用self._get_flip_locations方法获取要翻转(改变)的位置。
- 对于每个要翻转的位置,检查字节位置是否小于20,如果是,则返回空(不进行翻转)。
- 对于字节位置大于等于20的情况,使用异或操作(XOR)对指定位置的比特进行翻转,将特定比特位置上的值从0变为1或从1变为0。
- 返回处理后的数据。
这段代码的目的是模拟网络通信中的比特错误,根据给定的比特错误率随机地翻转数据中的比特位,以模拟传输过程中的数据传输错误。
def _apply_ber(self, ber, data):
"""
将比特错误率应用到给定数据上
"""
# 根据比特错误率和数据长度获取要翻转的位置
for byte_number, bit_to_flip in self._get_flip_locations(ber, len(data)):
# 如果字节位置小于20,则返回空
if byte_number < 20:
return None
# 对指定位置的比特进行翻转
data[byte_number] = data[byte_number] ^ 1 << bit_to_flip
# 返回处理后的数据
return data
1.9 驱动器请求
这段代码的功能是在驱动器请求之间进行传输。主要步骤如下:
- 初始化计时器对象send_stuff_timer为None。
- 创建一个互斥锁对象request_set_lock,用于保护驱动器请求集合的并发访问。
- 创建一个空的驱动器请求集合request_set。
- 定义了一个内部函数send_stuff(),用于发送驱动器请求。
- 在主循环中,只要self.run_event为True,就会使用select()函数等待可读取的物理驱动器套接字。
- 如果有可读取的套接字,则依次处理其中的每个套接字:
- 使用互斥锁phy_socket_to_lock保护可读取套接字,接收从套接字收到的消息。
- 如果收到的消息为空,则跳过。
- 如果配置中设置了打印调试信息,打印消息已读取。
- 将收到的消息解码为字符串,并使用互斥锁request_set_lock将其添加到驱动器请求集合中。
- 如果驱动器请求集合不为空,并且send_stuff_timer为None或者计时器对象不处于活动状态时,创建并启动一个计时器send_stuff_timer,定时调用send_stuff()函数发送驱动器请求。
- 如果没有可读取的套接字,则继续下一次循环。
整体而言,这段代码通过监听物理驱动器套接字,接收驱动器请求,并定期将这些请求发送到网络模拟器。它使用互斥锁来确保对驱动器请求集合的并发访问安全,并使用计时器来定时发送请求。
def _transfer_driver_requests(self, phy_driver_sockets, phy_socket_to_lock, netsim_driver_socket, netsim_lock):
"""
在驱动器请求之间进行传输
"""
send_stuff_timer = None # 定义一个计时器对象
request_set_lock = threading.Lock() # 定义一个互斥锁对象
request_set = set() # 定义一个集合,用于存储驱动器请求
def send_stuff():
"""
发送驱动器请求
"""
with request_set_lock:
driver_requests_string = ("[" + ",".join(request_set) + "]").encode("utf-8") # 将驱动器请求转换为字符串并编码为字节流
request_set.clear() # 清空驱动器请求集合
try:
with netsim_lock:
self.send_one_message(netsim_driver_socket, gzip.compress(driver_requests_string)) # 使用网络模拟驱动器套接字发送压缩后的驱动器请求
if self.config['print_debug']:
print(f"Wrote driver request to network simulator") # 打印调试信息(写入驱动器请求到网络模拟器)
except socket.error as msg:
print("Error (Driver request transfer): %s" % (msg,))
while self.run_event.is_set():
r, __, __ = select.select(phy_driver_sockets, [], [], self.config['responsiveness_timeout']) # 使用select函数等待可读取的物理驱动器套接字
if r:
try:
for readable_socket in r:
with phy_socket_to_lock[readable_socket]:
incoming_message_string = self.recv_one_message(readable_socket) # 接收来自可读取套接字的消息
if not incoming_message_string:
continue
if self.config['print_debug']:
print(f"Read driver request") # 打印调试信息(读取驱动器请求)
incoming_message_string = incoming_message_string.decode("utf-8") # 将收到的消息解码为字符串
with request_set_lock:
request_set.add(incoming_message_string) # 将驱动器请求添加到请求集合中
except socket.error as msg:
print("Error (Driver request transfer): %s" % (msg,))
if request_set and (send_stuff_timer is None or not send_stuff_timer.is_alive()):
send_stuff_timer = threading.Timer(self.config['driver_sync_time'], function=send_stuff) # 创建一个计时器对象,定时发送驱动器请求
send_stuff_timer.setDaemon(True)
send_stuff_timer.start()
else:
continue # 继续下一次循环
1.10 驱动器响应
这段代码的功能是在驱动器响应之间进行传输。主要步骤如下:
- 创建一个字典mac_to_index,将MAC地址映射到索引,用于快速查找对应的套接字索引。
- 在主循环中,只要self.run_event为True,就会使用select()函数等待可读取的网络模拟驱动器套接字。
- 如果有可读取的套接字,则执行以下操作:
- 使用网络模拟驱动器套接字接收消息。
- 如果接收到的数据列表为空,则跳过。
- 如果配置中设置了打印调试信息,打印已从网络模拟器读取驱动器响应的消息。
- 对解压缩后的数据列表进行遍历,处理每个驱动器响应数据:
- 获取驱动器响应中的源MAC地址。
- 如果源MAC地址不存在,则跳过。
- 根据源MAC地址查找对应的套接字索引。
- 获取对应的物理驱动器套接字。
- 使用物理驱动器套接字发送经过JSON编码的驱动器响应数据。
- 如果配置中设置了打印调试信息,打印已将驱动器响应写入套接字的消息。
- 如果没有可读取的套接字,则继续下一次循环。
总体而言,这段代码通过监听网络模拟驱动器套接字,接收驱动器响应并将其发送到相应的物理驱动器套接字。它使用了一个映射表来进行源MAC地址的索引查找,以便将驱动器响应发送到正确的物理驱动器套接字。代码还包含了一些调试信息的打印。
def _transfer_driver_responses(self, phy_driver_sockets, phy_socket_locks, netsim_driver_socket, netsim_lock):
"""
在驱动器响应之间进行传输
"""
mac_to_index = {mac_i: i for i, mac_i in enumerate(self.config['mac_list'])} # 创建一个字典,将MAC地址映射到索引
while self.run_event.is_set():
r, __, __ = select.select([netsim_driver_socket, ], [], [], self.config['responsiveness_timeout']) # 使用select函数等待可读取的网络模拟驱动器套接字
if r:
try:
with netsim_lock:
data_list = self.recv_one_message(netsim_driver_socket) # 接收来自网络模拟驱动器套接字的消息
if not data_list:
continue
if self.config['print_debug']:
print(f"Read driver response from network simulator") # 打印调试信息(从网络模拟器读取驱动器响应)
for data in json.loads(gzip.decompress(data_list)): # 解压缩消息并遍历每个驱动器响应数据
src_mac = data.get("src_mac", None) # 获取驱动器响应中的源MAC地址
if not src_mac:
continue
socket_index = mac_to_index[src_mac] # 根据源MAC地址查找对应的套接字索引
appropriate_socket = phy_driver_sockets[socket_index] # 获取对应的物理驱动器套接字
with phy_socket_locks[socket_index]:
self.send_one_message(appropriate_socket, json.dumps(data).encode("utf-8")) # 发送驱动器响应数据到物理驱动器套接字
if self.config['print_debug']:
print(f"Wrote driver response to socket {socket_index}") # 打印调试信息(将驱动器响应写入套接字)
except socket.error as msg:
print("Error (Driver response transfer): %s" % (msg,))
else:
continue # 继续下一次循环
1.11 驱动器运行
这段代码的功能是运行驱动器进程。主要步骤如下:
- 根据配置中的设置,确定使用的套接字类型(UNIX域套接字或IPv4套接字)以及相应的服务器地址。
- 创建网络模拟器驱动器套接字对象,并根据需要设置套接字选项。
- 尝试连接到网络模拟器驱动器套接字,如果连接失败则返回。
- 创建一个线程5. 创建一个线程锁对象,用于保护网络模拟器套接字的访问。
- 创建物理驱动器套接字列表,并为每个物理驱动器创建套接字对象。尝试连接到每个物理驱动器套接字,如果连接失败则跳过该物理驱动器。
- 创建物理驱动器套接字锁列表,并建立物理驱动器套接字与锁的映射关系。
- 创建读取线程,用于在驱动器之间传输请求。
- 创建写入线程,用于在驱动器之间传输响应。
- 启动读取线程和写入线程。
- 等待读取线程和写入线程结束。
总体而言,这段代码的目的是在驱动器之间建立连接,并在不同线程中进行请求和响应的传输。
def _run_driver_process(self, num_interfaces):
"""
运行驱动器进程
"""
if self.net_use_uds:
server_address = self.config['net_driver_uds_server_address'] # 获取网络驱动器的UNIX域套接字地址
netsim_driver_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 创建UNIX域套接字对象
else:
server_address = (self.config['netsim_ip_server_address'], self.config['netsim_ip_ranging_port']) # 获取网络模拟器的IP地址和端口号
netsim_driver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建IPv4套接字对象
netsim_driver_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 设置套接字选项,禁用Nagle算法
netsim_driver_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) # 设置套接字选项,启用快速ACK
try:
if self.try_connecting(netsim_driver_socket, server_address, "Error (Network simulator driver client):") == -1:
return
netsim_lock = threading.Lock() # 创建一个线程锁对象,用于保护网络模拟器套接字的访问
phy_driver_sockets = [] # 物理驱动器套接字列表
phy_socket_locks = [] # 物理驱动器套接字锁列表
phy_socket_to_lock = {} # 物理驱动器套接字到锁的映射字典
for i in range(num_interfaces):
phy_socket_server_address = self.config['phy_driver_uds_server_address'] + str(i) # 获取物理驱动器的UNIX域套接字地址
phy_driver_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 创建UNIX域套接字对象
if self.try_connecting(phy_driver_socket, phy_socket_server_address, f"Error (Physics coordinator driver client {i}):") == -1:
continue
phy_driver_sockets.append(phy_driver_socket) # 将物理驱动器套接字添加到列表中
phy_socket_lock = threading.Lock() # 创建线程锁对象,用于保护物理驱动器套接字的访问
phy_socket_locks.append(phy_socket_lock) # 将物理驱动器套接字锁添加到列表中
phy_socket_to_lock[phy_driver_socket] = phy_socket_lock # 将物理驱动器套接字与锁的映射关系添加到字典中
except socket.error as msg:
if str(msg).strip():
print("Error (driver process): %s" % (msg,))
return
read_thread = threading.Thread(target=self._transfer_driver_requests, args=(
phy_driver_sockets, phy_socket_to_lock, netsim_driver_socket, netsim_lock)) # 创建读取线程,用于传输驱动器请求
write_thread = threading.Thread(target=self._transfer_driver_responses, args=(
phy_driver_sockets, phy_socket_locks, netsim_driver_socket, netsim_lock)) # 创建写入线程,用于传输驱动器响应
read_thread.start() # 启动读取线程
write_thread.start() # 启动写入线程
read_thread.join() # 等待读取线程结束
write_thread.join() # 等待写入线程结束
1.12 连接尝试
这段代码的功能是在指定的套接字和地址之间尝试建立连接,并返回连接结果。具体步骤如下:
- 初始化连接尝试次数计数器为0。
- 在循环中,尝试通过套接字的connect方法连接到指定的地址。
- 如果连接成功,跳出循环。
- 如果连接失败(捕获到socket.error异常),将连接尝试次数计数器加1,并等待2秒后再进行下一次尝试。
- 如果连接尝试次数超过100次,打印连接失败的错误信息。
- 返回连接失败的标识(-1)。
- 返回连接成功的标识(0)。
该方法的作用是在连接建立之前进行多次尝试,以增加连接成功的概率,并提供一定的错误处理。
@staticmethod
def try_connecting(sock, address, tag):
"""
尝试在给定的套接字和地址之间建立连接
"""
i = 0 # 连接尝试次数计数器
while i < 100: # 最多尝试100次
try:
sock.connect(address) # 尝试连接套接字和地址
break # 如果连接成功,跳出循环
except socket.error: # 捕获套接字错误异常
i += 1 # 连接尝试次数加1
time.sleep(2) # 等待2秒后再进行下一次尝试
else:
print(tag + " Could not connect") # 如果连接尝试次数超过100次,打印连接失败的错误信息
return -1 # 返回连接失败的标识
return 0 # 返回连接成功的标识
1.13 数据包翻转
这段代码的功能是生成数据包中需要进行位翻转的位置。具体步骤如下:
- 初始化当前字节位置为0。
- 在循环中,生成满足指定伯努利分布的随机数,表示到下一个错误位的距离。
- 将距离转换为字节和位,其中space_to_next_error_byte表示距离下一个错误位的字节数,bit_to_flip表示需要翻转的位。
- 更新当前字节位置,将其增加space_to_next_error_byte。
- 如果当前字节位置仍然小于数据包长度,则生成当前字节和位翻转的位置。
- 重复步骤2到步骤5,直到当前字节位置不小于数据包长度。
通过生成器(yield语句),该方法可以按需逐步生成数据包中需要翻转的位的位置。
@staticmethod
def _get_flip_locations(ber, packet_length):
"""
生成数据包中翻转位的位置
"""
current_byte = 0 # 当前字节位置
while current_byte < packet_length: # 当前字节位置小于数据包长度时进行循环
space_to_next_error_bit = np.random.geometric(p=ber) # 生成满足指定伯努利分布的随机数,表示到下一个错误位的距离
space_to_next_error_byte, bit_to_flip = divmod(space_to_next_error_bit, 8) # 将到下一个错误位的距离转换为字节和位
current_byte = current_byte + space_to_next_error_byte # 更新当前字节位置
if current_byte < packet_length: # 如果当前字节位置仍然小于数据包长度
yield current_byte, bit_to_flip # 生成当前字节和位翻转的位置
1.14 数据发送
这段代码的功能是将数据发送到指定的套接字。具体步骤如下:
- 获取数据的长度,使用len(data)方法获得数据的字节数。
- 使用struct.pack('!I', length)将数据的长度打包为4字节的无符号整数。'!I'表示使用网络字节序(big-endian)对无符号整数进行打包。
- 将打包后的数据长度和数据本身进行拼接,得到完整的消息。
- 使用套接字的sendall()方法将消息发送到套接字。
通过这段代码,可以将数据按照特定的格式发送到指定的套接字中。在发送消息时,先发送一个4字节的无符号整数表示消息的长度,然后再发送消息的内容。
@staticmethod
def send_one_message(sock, data):
"""
将数据发送到套接字
"""
length = len(data) # 获取数据的长度
sock.sendall(struct.pack('!I', length) + data) # 将数据的长度打包为4字节的无符号整数,并与数据本身进行拼接后发送到套接字
1.15 数据接收
这段代码的功能是从套接字接收一条完整的消息。具体步骤如下:
- 调用类方法recvall(sock, 4),从套接字接收4字节的数据作为消息的长度。这里使用了类方法recvall来确保接收到指定长度的数据。
- 如果接收到的长度数据为空(表示连接已关闭或出错),则返回None。
- 使用struct.unpack('!I', lengthbuf)将接收到的长度数据解包为一个无符号整数。'!I'表示使用网络字节序(big-endian)进行解包。
- 调用类方法recvall(sock, length),从套接字接收指定长度的消息数据。同样,这里使用了类方法recvall来确保接收到指定长度的数据。
- 返回接收到的完整消息数据。
通过这段代码,可以方便地从套接字中接收完整的消息,首先接收4字节的长度信息,然后根据长度信息接收相应长度的消息数据。
@classmethod
def recv_one_message(cls, sock):
"""
从套接字接收一条消息
"""
lengthbuf = cls.recvall(sock, 4) # 调用类方法recvall,从套接字接收4字节的数据作为消息长度
if not lengthbuf: # 如果接收到的长度数据为空(表示连接关闭或出错),返回None
return None
length, = struct.unpack('!I', lengthbuf) # 将接收到的长度数据解包为一个无符号整数,使用网络字节序(big-endian)
return cls.recvall(sock, length) # 调用类方法recvall,从套接字接收指定长度的消息数据
1.16 recvall
这段代码的功能是从套接字接收指定数量的数据。具体步骤如下:
- 创建一个字节数组buf,用于存储接收到的数据。
- 在循环中,当还需要接收的数据数量count不为0时,继续接收数据。
- 使用sock.recv(count)从套接字接收指定数量的数据,并将其存储在newbuf中。
- 如果接收到的数据newbuf为空(表示连接已关闭或出错),则返回None。
- 将接收到的数据newbuf追加到存储数据的字节数组buf中。
- 更新还需要接收的数据数量count,减去已接收数据的长度len(newbuf)。
- 重复步骤2到步骤6,直到接收到指定数量的数据。
- 返回接收到的完整数据buf。
通过这段代码,可以确保从套接字中接收到指定数量的完整数据。它在循环中多次调用sock.recv()方法,直到接收到所需的数据量为止,并将接收到的数据存储在字节数组中,然后将其作为结果返回。
@staticmethod
def recvall(sock, count):
"""
从套接字接收指定数量的数据
"""
buf = bytearray() # 创建一个字节数组用于存储接收到的数据
while count: # 当还需要接收的数据数量不为0时进行循环
newbuf = sock.recv(count) # 从套接字接收指定数量的数据,根据前面,默认传入4个字节
if not newbuf: # 如果接收到的数据为空(表示连接已关闭或出错),返回None
return None
buf += newbuf # 将接收到的数据追加到存储数据的字节数组中
count -= len(newbuf) # 更新还需要接收的数据数量
return buf # 返回接收到的完整数据
1.17 网络协调器运行
这段代码的功能是运行网络协调器。具体步骤如下:
- 使用self._setup_network()方法设置网络并获取IP地址列表。
- 获取IP地址列表的长度,存储在num_ips变量中。
- 定义一些打开TUN设备所需的常量。
- 创建一个空列表tuns用于存储打开的TUN设备。
- 创建一个空列表tun_threads用于存储TUN线程。
- 初始化phy_protobuf_thread和netsim_protobuf_thread为None。
- 初始化driver_process为None。
- 设置运行事件标志self.run_event为True。
- 在try块中执行以下操作:
- 将IP地址列表转换为字典ip_to_tun_map,其中IP地址映射到TUN设备的索引。
- 使用open('/dev/net/tun', 'r+b', buffering=0)打开TUN设备,并将其添加到tuns列表中。
- 使用fcntl.ioctl设置T上部分回答被截断了,以下是继续的回答:
- 使用fcntl.ioctl设置TUN设备的参数。
- 启动TUN线程,每个线程读取特定的TUN设备。
- 启动物理层protobuf线程phy_protobuf_thread和网络仿真protobuf线程netsim_protobuf_thread。
- 如果配置中do_driver_transfer为True,则启动驱动进程driver_process。
- 在主线程中阻塞,等待用户输入来停止程序。
- 如果收到用户输入,抛出KeyboardInterrupt异常来退出程序。
- 如果捕获到KeyboardInterrupt异常,输出信息并清除运行事件标志。
- 等待所有TUN线程、物理层protobuf线程、网络仿真protobuf线程和驱动进程结束。
- 最后,调用self._remove_network(num_ips)方法移除网络。
请注意,这段代码是一个方法的片段,它使用了一些类成员变量和方法,其中的具体实现细节无法确定,因此无法提供完整的功能描述。
def run_network_coordinator(self):
"""
运行网络协调器
"""
ip_list = self._setup_network() # 设置网络并返回IP地址列表
num_ips = len(ip_list) # 获取IP地址列表的长度
# 打开TUN设备的常量
TUNSETIFF = 0x400454ca
TUNSETOWNER = TUNSETIFF + 2
IFF_TUN = 0x0001
IFF_NO_PI = 0x1000
tuns = [] # 存储打开的TUN设备
tun_threads = [] # 存储TUN线程
phy_protobuf_thread = None # 物理层protobuf线程
netsim_protobuf_thread = None # 网络仿真protobuf线程
driver_process = None # 驱动进程
self.run_event.set() # 设置运行事件标志
try:
# 读取IP地址
ip_to_tun_map = {int(ipaddress.IPv4Address(ip_i)): i for i, ip_i in enumerate(ip_list)}
# 打开TUN设备
for i in range(num_ips):
tuns.append(open('/dev/net/tun', 'r+b', buffering=0))
ifri = struct.pack('16sH', b'tun' + str(i).encode('ascii'), IFF_TUN | IFF_NO_PI)
fcntl.ioctl(tuns[i], TUNSETIFF, ifri)
fcntl.ioctl(tuns[i], TUNSETOWNER, 1000)
# 启动TUN线程
for i in range(num_ips):
tun_threads.append(threading.Thread(target=self._read_from_tuns, args=(i, tuns,)))
tun_threads[i].start()
# 启动protobuf线程
phy_protobuf_thread = threading.Thread(target=self._run_protobuf_client_phy_coord, args=())
netsim_protobuf_thread = threading.Thread(target=self._run_protobuf_client_net_sim,
args=(tuns, ip_to_tun_map))
phy_protobuf_thread.start()
netsim_protobuf_thread.start()
# 启动驱动进程
if self.config['do_driver_transfer']:
driver_process = multiprocessing.Process(target=self._run_driver_process, args=(num_ips,))
driver_process.start()
# 阻塞主线程
print("输入任何内容停止\n")
while self.run_event.is_set():
r, __, __ = select.select([sys.stdin, ], [], [], self.config['responsiveness_timeout'])
if r:
raise KeyboardInterrupt
else:
continue
raise KeyboardInterrupt
except KeyboardInterrupt:
print("尝试关闭线程")
self.run_event.clear()
if tun_threads:
for i in tun_threads:
i.join()
if phy_protobuf_thread:
phy_protobuf_thread.join()
if netsim_protobuf_thread:
netsim_protobuf_thread.join()
if driver_process:
driver_process.join()
print("线程成功关闭")
finally:
self._remove_network(num_ips) # 移除网络
2.AtomicCounter类
这段代码定义了一个原子计数器类AtomicCounter,它具有以下功能:
- init(self, initial=0):类的构造函数,用于初始化原子计数器对象。可以传入一个初始值initial,默认为0。
- increment(self, num=1):原子地将计数器的值增加num(默认为1),并返回增加后的新值。该方法使用了线程锁_lock,以确保在多线程环境下对计数器的操作是原子的。
请注意,原子计数器类的目的是在多线程环境中实现线程安全的计数操作,以避免竞态条件和数据不一致的问题。
class AtomicCounter:
def __init__(self, initial=0):
"""初始化一个新的原子计数器,初始值为给定的initial(默认为0)"""
self.value = initial # 计数器的值
self._lock = threading.Lock() # 用于实现原子操作的锁对象
def increment(self, num=1):
"""原子地将计数器增加num(默认为1),并返回新的值"""
with self._lock: # 使用锁对象保证原子操作
self.value += num # 增加计数器的值
return self.value # 返回新的计数器值
3. 主函数
这段代码的主要功能是作为程序的入口点,并调用main函数来运行网络协调器。具体步骤如下:
- main(args)函数是程序的入口函数,它接受一个参数args,表示命令行参数。
- 如果命令行参数的数量不等于2,即不满足len(args) != 2条件,打印使用说明。
- 如果命令行参数的数量等于2,即满足len(args) == 2条件,说明提供了一个配置文件路径作为参数。
- 创建一个NetworkCoordinator对象,传入配置文件路径作为参数,即NetworkCoordinator(args[1])。
- 调用network_coordinator对象的run_network_coordinator()方法,运行网络协调器的功能。
- return 0表示程序正常退出并返回退出码0。
然后,使用if name == 'main':来判断当前脚本是否作为主程序执行。如果是主程序执行,则调用main(sys.argv)来运行主函数,并使用sys.exit()退出程序,返回main函数的返回值。
该代码的作用是在命令行中运行网络协调器,并根据命令行参数指定的配置文件来配置和控制网络协调器的行为。
def main(args):
if len(args) != 2:
print("usage: network_coordinator.py <config_file>")
else:
network_coordinator = NetworkCoordinator(args[1]) # 创建一个网络协调器对象,传入配置文件路径作为参数
network_coordinator.run_network_coordinator() # 运行网络协调器的功能
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv)) # 执行main函数并将命令行参数传递给它,然后使用sys.exit退出程序,返回main函数的返回值
评论区