目 录CONTENT

文章目录

network_coordinator

Rho
Rho
2023-08-20 / 0 评论 / 0 点赞 / 36 阅读 / 38425 字
这段代码是一个网络协调器,它的主要作用是:
  • 设置起一个模拟的网络环境,包括启动TUN虚拟网络接口,设置路由表等。
  • 启动多个线程来读取和写入TUN接口,来模拟网络数据包的传输。
  • 与物理仿真和网络仿真程序通信,使用Protobuf协议来交换时间步进和网络数据包信息。
  • 统一管理时间步进,同步各个线程。
  • 缓存和转发网络数据包,提供延迟和错误等网络效应。
  • 支持驱动程序与仿真程序的通信,转发控制请求和响应。

总的来说,它实现了一个在用户空间模拟网络的框架,将物理层和网络层的仿真程序连接起来,模拟了完整的物理-MAC-网络栈。它可以支持多节点多通道的协调模拟。通过它,可以方便地在没有实际硬件的情况下,测试和开发涉及物理层和网络层交互的算法、协议等,比如MAC协议、routing等。它抽象了实际硬件,提供了一个软件定义的网络环境。

1.关于NetworkCoordinator类的介绍

1.1 头文件和NetworkCoordinator的初始化

import gzip  # 用于处理gzip压缩文件
import ipaddress  # 用于处理IP地址
import json  # 用于处理JSON数据
import multiprocessing  # 用于多进程编程
import os  # 用于操作文件和目录
import socket  # 用于网络通信
import struct  # 用于处理二进制数据和字节序列
import subprocess  # 用于在Python中执行外部命令
import sys  # 提供对Python运行时环境的访问和控制
import threading  # 用于多线程编程
import time  # 用于时间相关操作

import array  # 用于处理数组
import fcntl  # 提供对文件描述符的控制
import numpy as np  # 用于处理数值和数组
import select  # 用于实现I/O多路复用
import yaml  # 用于读取和写入YAML文件

import protobuf_msgs.physics_update_pb2 as phyud  # 用于处理物理更新的协议缓冲区消息
import protobuf_msgs.network_update_pb2 as netud  # 用于处理网络更新的协议缓冲区消息

class NetworkCoordinator:
    """
    网络协调器类,用于协调网络通信的功能
    """

    def __init__(self, config_file):
        """
        类的构造函数,接受一个配置文件作为参数
        """

        # event used to indicate whether threads are allowed to run
        self.run_event = multiprocessing.Event()  # 多进程事件对象,用于指示线程是否可以运行

        # barriers used to sync the protobuf threads at the beginning and end of their loops
        self.protobuf_sync_barrier_top = threading.Barrier(2, timeout=5)  # 用于同步protobuf线程在循环开始时的线程屏障对象
        self.protobuf_sync_barrier_bottom = threading.Barrier(2, timeout=10)  # 用于同步protobuf线程在循环结束时的线程屏障对象

        # global "time"
        self.time_counter = AtomicCounter()  # 全局时间的原子计数器对象

        # global packet ID
        self.packet_id = AtomicCounter()  # 全局数据包ID的原子计数器对象

        # incoming packets will be stored in this dict until their data is sent to the network simulator
        self.incoming_packet_buffer = {}  # 接收到的数据包将存储在此字典中,直到其数据被发送到网络模拟器

        # to get TUN threads to wait while the buffer is consumed
        self.incoming_packet_buffer_busy = threading.Lock()  # 用于在缓冲区被使用时阻塞TUN线程的锁对象

        # packets will be stored in this dict until the network simulator asks for them to be sent to destination
        self.outgoing_packet_buffer = {}  # 数据包将存储在此字典中,直到网络模拟器要求将其发送到目标地址

        # stores ids of packets for each interval, and these are cleared as they time out
        self.dispatch_record = {}  # 存储每个时间间隔的数据包ID,并在超时时清除它们

        # node & channel information
        self.channel_data = None  # 节点和通道信息

        with open(config_file) as f:
            self.config = yaml.load(f, Loader=yaml.FullLoader)  # 加载配置文件数据
            self.phy_use_uds = self.config['phy_use_uds']  # 从配置数据中提取phy_use_uds的值
            self.net_use_uds = self.config['net_use_uds']  # 从配置数据中提取net_use_uds的值

1.2 网络设置

这段代码的主要功能是通过调用系统命令设置网络环境。它遍历配置文件中的IP地址列表,并执行一系列命令来创建和配置TUN设备、添加路由表规则和本地路由规则。最后,它返回配置文件中的IP地址列表。

请注意,这段代码使用了subprocess.call来执行系统命令,而不是在Python中直接实现相应的功能。因此,这些命令的具体功能和效果可以参考相应的系统命令文档。

def _setup_network(self):
    """
    设置网络环境
    """

    for i, ip_i in enumerate(self.config['ip_list']):
        # 添加TUN设备
        subprocess.call(["sudo", "ip", "tuntap", "add", "dev", "tun" + str(i), "mode", "tun"])
        # 启用TUN设备
        subprocess.call(["sudo", "ip", "link", "set", "tun" + str(i), "up"])
        # 添加IP地址到TUN设备
        subprocess.call(["sudo", "ip", "addr", "add", ip_i + "/32", "dev", "tun" + str(i)])

        for j, ip_j in enumerate(self.config['ip_list']):
            if j != i:
                # 添加路由表规则,指定目标地址经由相应的TUN设备发送
                subprocess.call(["sudo", "ip", "route", "add", ip_j + "/32", "dev", "tun" + str(i),
                                 "src", ip_i, "table", str(i + 1)])
        # 添加策略路由规则,指定源地址经由相应的TUN设备发送
        subprocess.call(["sudo", "ip", "rule", "add", "table", str(i + 1), "from", ip_i, "priority", "2"])

        # 添加条件本地路由规则
        subprocess.call(["sudo", "ip", "rule", "add", "iif", "tun" + str(i), "table", str(i + 101), "priority", "1"])
        # 添加本地路由表规则,指定本地地址经由相应的TUN设备发送
        subprocess.call(["sudo", "ip", "route", "add", "local", ip_i, "dev", "tun" + str(i), "table", str(i + 101)])

    # 删除默认的本地路由规则
    subprocess.call(["sudo", "ip", "rule", "del", "pref", "0", "from", "all", "lookup", "local"])
    # 添加新的本地路由规则
    subprocess.call(["sudo", "ip", "rule", "add", "pref", "10", "from", "all", "lookup", "local"])

    return self.config['ip_list']

1.3 网络设置移除

这段代码的功能是通过调用系统命令来移除之前设置的网络环境。它接受一个参数 num_ips,表示要移除的TUN设备的数量。然后,它使用循环依次删除每个TUN设备、路由表规则和条件本地路由规则。最后,它添加默认的本地路由规则并删除先前添加的本地路由规则。

请注意,与前面的代码段一样,这里也使用了 subprocess.call 来执行系统命令。具体命令的功能和效果可以参考相应的系统命令文档。

@staticmethod
def _remove_network(num_ips):
    """
    移除网络设置
    """

    for i in range(num_ips):
        # 删除TUN设备
        subprocess.call(["sudo", "ip", "tuntap", "del", "dev", "tun" + str(i), "mode", "tun"])
        # 删除路由表规则
        subprocess.call(["sudo", "ip", "rule", "del", "table", str(i + 1)])
        subprocess.call(["sudo", "ip", "rule", "del", "table", str(i + 101)])

    # 添加默认的本地路由规则
    subprocess.call(["sudo", "ip", "rule", "add", "pref", "0", "from", "all", "lookup", "local"])
    # 删除之前添加的本地路由规则
    subprocess.call(["sudo", "ip", "rule", "del", "pref", "10", "from", "all", "lookup", "local"])

1.4 读取TUN设备数据

这段代码通过循环从指定的TUN设备中读取数据。它使用 select.select 函数来等待TUN设备就绪,并使用 os.read 函数读取数据。读取的数据会进行处理,提取其中的源IP地址和目标IP地址,并将数据保存到输入数据包缓冲区中。在保存数据之前,会对数据进行版本判断,只处理IPv4的数据包。最后,会打印一些调试信息和对应的TUN设备的退出消息,并关闭TUN设备。

请注意,这段代码中涉及到的一些变量和函数,如 self.run_event,self.config,self.incoming_packet_buffer,self.packet_id,以及其他未提供的代码片段,需要根据上下文来确定其具体含义和实现方式。

def _read_from_tuns(self, i, tuns):
    """
    从TUN设备中读取数据
    """

    while self.run_event.is_set():
        # 从TUN设备中读取数据
        try:
            r, __, __ = select.select([tuns[i].fileno(), ], [], [], self.config['responsiveness_timeout'])
            if r:
                data = os.read(tuns[i].fileno(), 4096)
            else:
                continue
        except OSError:
            self.run_event.clear()  # 结束所有线程
            print("TUN " + str(i) + " 似乎已经断开连接,无法从中读取数据")
            break

        # 识别IP地址并保存到缓冲区中
        version = data[0]
        version = version >> 4
        if version == 4:
            ip_src = int.from_bytes(data[12:16], byteorder="big")
            ip_dst = int.from_bytes(data[16:20], byteorder="big")
            with self.incoming_packet_buffer_busy:
                self.incoming_packet_buffer[(self.packet_id.value, ip_src, ip_dst)] = data
                if self.config['print_debug']: print(f"从TUN {i} 读取 {(self.packet_id.value, ip_src, ip_dst)}")
                self.packet_id.increment()

    print("TUN " + str(i) + " 退出")
    tuns[i].close()

1.5 物理协调器通信

这段代码通过Protobuf客户端与物理协调器进行通信。它根据配置选择使用UNIX域套接字(AF_UNIX)还是IP套接字(AF_INET)。然后,它尝试连接到指定的服务器地址。在一个循环中,它根据条件发送时间更新请求,等待响应,并解析收到的数据。根据消息的类型,它执行相应的操作,如更新通道数据和递增时间计数器。如果发生错误或超时,会打印错误信息并结束所有线程。最后,它关闭套接字并终止底部Protobuf同步屏障。

请注意,这段代码中涉及到的一些变量和函数,如 self.run_event,self.config,self.protobuf_sync_barrier_top,self.protobuf_sync_barrier_bottom,self.time_counter,self.send_one_message,self.recv_one_message,self.channel_data,phyud.PhysicsUpdate,以及其他未提供的代码片段,需要根据上下文来确定其具体含义和实现方式。

def _run_protobuf_client_phy_coord(self):
    """
    运行物理协调器的Protobuf客户端
    """

    # 连接到服务器的套接字端口
    if self.phy_use_uds:
        # 创建一个UNIX域套接字
        server_address = self.config['phy_uds_server_address']
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    else:
        server_address = (self.config['phy_ip_server_address'], self.config['phy_ip_server_port'])
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)

    try:
        if self.try_connecting(sock, server_address, "Error (physics coordinator protobuf client):") == -1:
            self.run_event.clear()
            return
        waiting_for_update = False
        while self.run_event.is_set():
            if not waiting_for_update:
                self.protobuf_sync_barrier_top.wait()
                # 发送时间更新请求
                time_update = phyud.PhysicsUpdate()
                time_update.time_val = self.time_counter.value
                time_update.msg_type = phyud.PhysicsUpdate.BEGIN
                self.send_one_message(sock, gzip.compress(time_update.SerializeToString()))

            waiting_for_update = True
            # 等待完成
            r, __, __ = select.select([sock, ], [], [], self.config['responsiveness_timeout'])
            if r:
                # 获取响应
                data = self.recv_one_message(sock)
                if not data: continue
                waiting_for_update = False

                # 解析时间更新消息
                time_update = phyud.PhysicsUpdate()
                time_update.ParseFromString(gzip.decompress(data))
                if time_update.msg_type != phyud.PhysicsUpdate.END:
                    raise ValueError("Network Coordinator got non-END message from Physics Coordinator!")
                self.channel_data = time_update.channel_data if time_update.channel_data else b""

                i = self.protobuf_sync_barrier_bottom.wait()
                if i == 0:  # 只有一个线程会执行这个
                    self.time_counter.increment()
            else:
                continue
    except socket.error as msg:
        print("Error (physics coordinator protobuf client): %s" % (msg,))
        self.run_event.clear()  # 结束所有线程
    except threading.BrokenBarrierError:
        print("Physics coordinator client: Timeout/Abort while waiting for network simulator client")
        self.run_event.clear()  # 结束所有线程
    finally:
        print('Closing protobuf client socket (physics coordinator )', file=sys.stderr)
        self.protobuf_sync_barrier_bottom.abort()
        sock.close()

1.6 网络协调器通信

这段代码的功能是通过Protobuf与网络模拟器进行通信。它执行以下操作:

  • 根据配置选择使用UNIX域套接字(UDS)还是IP套接字来创建套接字对象。
  • 尝试连接到网络模拟器服务器。
  • 在循环中,发送时间更新请求以获取模拟器的状态。
  • 等待模拟器的更新响应。
  • 处理接收到的更新数据。
  • 根据更新数据,将数据包写入相应的TUN接口。
  • 清理过时的数据包和记录。
  • 最后关闭套接字连接。

请注意,这段代码缺少一些定义,如self.protobuf_sync_barrier_top、self.time_counter、self.incoming_packet_buffer等。因此,要完整运行代码,还需要查看其他部分的实现。

def _run_protobuf_client_net_sim(self, tuns, ip_to_tun_map):
    """
    运行网络模拟器的Protobuf客户端
    """

    # 连接到服务器的套接字端口
    if self.net_use_uds:
        # 创建一个UNIX域套接字
        server_address = self.config['netsim_uds_server_address']
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    else:
        server_address = (self.config['netsim_ip_server_address'], self.config['netsim_ip_server_port'])
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    try:
        if self.try_connecting(sock, server_address, "Error (network simulator protobuf client):") == -1:
            self.run_event.clear()
            return

        waiting_for_update = False
        while self.run_event.is_set():
            if not waiting_for_update:
                # 发送时间更新请求
                self.protobuf_sync_barrier_top.wait()
                current_time = self.time_counter.value
                time_update = netud.NetworkUpdate()
                time_update.time_val = current_time
                time_update.msg_type = netud.NetworkUpdate.BEGIN

                # 告诉TUN线程等待缓冲区清空
                with self.incoming_packet_buffer_busy:
                    temp_packet_buffer = self.incoming_packet_buffer
                    self.incoming_packet_buffer = {}
                if temp_packet_buffer:
                    self.dispatch_record[current_time] = []
                for key_tuple, pkt_data in temp_packet_buffer.items():
                    time_update.pkt_id.append(key_tuple[0])
                    time_update.src_ip.append(key_tuple[1])
                    time_update.dst_ip.append(key_tuple[2])
                    time_update.pkt_lengths.append(len(pkt_data))

                    self.dispatch_record[current_time].append(key_tuple)

                self.outgoing_packet_buffer.update(temp_packet_buffer)

                if self.channel_data:
                    time_update.channel_data = self.channel_data

                self.send_one_message(sock, gzip.compress(time_update.SerializeToString()))

            waiting_for_update = True
            # 等待更新
            r, __, __ = select.select([sock, ], [], [], self.config['responsiveness_timeout'])
            if r:
                data = self.recv_one_message(sock)
                if not data: continue
                waiting_for_update = False

                time_update = netud.NetworkUpdate()
                time_update.ParseFromString(gzip.decompress(data))

                for time_update_tuple in zip(time_update.pkt_id, time_update.src_ip, time_update.dst_ip,
                                             time_update.ber, time_update.rx_ip):
                    if not self.run_event.is_set():
                        print("Network simulator was asked to exit while writing data")
                        break
                    ber = time_update_tuple[3]
                    key_tuple = time_update_tuple[:3]
                    if key_tuple not in self.outgoing_packet_buffer:
                        # 数据包已经丢失,可能是由于存储超时导致的
                        print(f"Packet {key_tuple} to be delivered was already dropped")
                        continue
                    data = self.outgoing_packet_buffer[key_tuple]

                    if key_tuple[2] == int(self.config['broadcast_address']):
                        # 将UDP广播(255.255.255.255)转换为单播。
                        data = bytearray(data)
                        data[16:20] = time_update_tuple[4].to_bytes(4, byteorder="big")  # 设置IP
                        data[10:12] = b'\x00\x00'  # 清除校验和
                        data[10:12] = self.generate_ipv4_checksum(data[:20]).to_bytes(2, byteorder="big")
                        data[26:28] = b'\x00\x00'  # 清除UDP校验和

                    if ber == 0:
                        pass
                    elif 0 < ber < 1:
                        if not isinstance(data, bytearray): data = bytearray(data)
                        data = self._apply_ber(ber, data)
                        if not data: continue
                    else:
                        continue

                    try:
                        os.write(tuns[ip_to_tun_map[time_update_tuple[4]]].fileno(), data)
                        if self.config['print_debug']: print(f"Wrote {key_tuple} to TUN {ip_to_tun_map[time_update_tuple[4]]}")
                    except (OSError, KeyError, ValueError) as msg:
                        #print("Network simulator client: Error while writing to TUN. %s" % (msg,))
                        continue
                for key_tuple in zip(time_update.clear_pkt_id, time_update.clear_src_ip, time_update.clear_dst_ip):
                    if key_tuple in self.outgoing_packet_buffer:
                        del self.outgoing_packet_buffer[key_tuple]

                time_to_clear = current_time - self.config['packet_holding_duration']
                if time_to_clear in self.dispatch_record:
                    for key_tuple in self.dispatch_record[time_to_clear]:
                        if key_tuple in self.outgoing_packet_buffer:
                            del self.outgoing_packet_buffer[key_tuple]
                    del self.dispatch_record[time_to_clear]

                if time_update.msg_type != netud.NetworkUpdate.END:
                    raise ValueError("Network Coordinator got non-END message from Network Simulator!")

                i = self.protobuf_sync_barrier_bottom.wait()
                if i == 0:  # 只有一个线程会执行这段代码
                    self.time_counter.increment()

    except socket.error as msg:
        print("Error (network simulator protobuf client): %s" % (msg,))
        self.run_event.clear()  # 结束所有线程
    except threading.BrokenBarrierError:
        print("Network simulator client: Timeout/Abort while waiting for physics coordinator client")
        self.run_event.clear()  # 结束所有线程
    finally:
        self.protobuf_sync_barrier_bottom.abort()
        print('Closing protobuf client socket (network simulator)', file=sys.stderr)
        sock.close()

1.7 IPv4头部的校验和

这段代码的功能是计算IPv4头部的校验和。IPv4头部包含多个字段,其中一个字段就是校验和。校验和用于验证IP头部在传输过程中是否出现了错误或损坏。该方法执行以下操作:

检查IP头部的长度是否为奇数,如果是则在末尾添加一个空字节,以确保字节数组的长度为偶数。 将IP头部按照16位的字节数组进行分组,并计算它们的总和。 将求和结果的高16位与低16位相加,以便将溢出的位添加到总和中。 将相加后的结果的高16位与低16位再次相加,以确保校验和的位数不超过16位。 对结果取反,得到校验和的补码形式。 假设使用小端字节序,将校验和的字节顺序进行调整,确保正确的字节顺序。 返回最终的校验和值。 这段代码的目的是计算IPv4头部的校验和,以便在网络通信中进行数据完整性的验证。

@staticmethod
def generate_ipv4_checksum(ip_header):
    """
    生成IPv4头部的校验和
    """

    # 如果IP头部的长度是奇数,则在末尾添加一个空字节
    if len(ip_header) % 2 == 1:
        ip_header += "\0"

    # 将IP头部按照16位的字节数组进行分组求和
    checksum = sum(array.array("H", ip_header))

    # 将求和结果的高16位与低16位相加
    checksum = (checksum >> 16) + (checksum & 0xffff)

    # 将相加后的结果的高16位与低16位再次相加
    checksum += (checksum >> 16)

    # 取反得到校验和的补码形式
    checksum = ~checksum

    # 假设使用小端字节序,将校验和字节顺序进行调整
    return (((checksum >> 8) & 0xff) | checksum << 8) & 0xffff

1.8 比特错误率应用

这段代码的功能是将比特错误率(BER)应用到给定的数据上。方法执行以下操作:

  • 根据比特错误率(BER)和数据的长度,使用self._get_flip_locations方法获取要翻转(改变)的位置。
  • 对于每个要翻转的位置,检查字节位置是否小于20,如果是,则返回空(不进行翻转)。
  • 对于字节位置大于等于20的情况,使用异或操作(XOR)对指定位置的比特进行翻转,将特定比特位置上的值从0变为1或从1变为0。
  • 返回处理后的数据。

这段代码的目的是模拟网络通信中的比特错误,根据给定的比特错误率随机地翻转数据中的比特位,以模拟传输过程中的数据传输错误。

def _apply_ber(self, ber, data):
    """
    将比特错误率应用到给定数据上
    """

    # 根据比特错误率和数据长度获取要翻转的位置
    for byte_number, bit_to_flip in self._get_flip_locations(ber, len(data)):

        # 如果字节位置小于20,则返回空
        if byte_number < 20:
            return None

        # 对指定位置的比特进行翻转
        data[byte_number] = data[byte_number] ^ 1 << bit_to_flip

    # 返回处理后的数据
    return data

1.9 驱动器请求

这段代码的功能是在驱动器请求之间进行传输。主要步骤如下:

  1. 初始化计时器对象send_stuff_timer为None。
  2. 创建一个互斥锁对象request_set_lock,用于保护驱动器请求集合的并发访问。
  3. 创建一个空的驱动器请求集合request_set。
  4. 定义了一个内部函数send_stuff(),用于发送驱动器请求。
  5. 在主循环中,只要self.run_event为True,就会使用select()函数等待可读取的物理驱动器套接字。
  6. 如果有可读取的套接字,则依次处理其中的每个套接字:
    • 使用互斥锁phy_socket_to_lock保护可读取套接字,接收从套接字收到的消息。
    • 如果收到的消息为空,则跳过。
    • 如果配置中设置了打印调试信息,打印消息已读取。
    • 将收到的消息解码为字符串,并使用互斥锁request_set_lock将其添加到驱动器请求集合中。
  7. 如果驱动器请求集合不为空,并且send_stuff_timer为None或者计时器对象不处于活动状态时,创建并启动一个计时器send_stuff_timer,定时调用send_stuff()函数发送驱动器请求。
  8. 如果没有可读取的套接字,则继续下一次循环。

整体而言,这段代码通过监听物理驱动器套接字,接收驱动器请求,并定期将这些请求发送到网络模拟器。它使用互斥锁来确保对驱动器请求集合的并发访问安全,并使用计时器来定时发送请求。

def _transfer_driver_requests(self, phy_driver_sockets, phy_socket_to_lock, netsim_driver_socket, netsim_lock):
    """
    在驱动器请求之间进行传输
    """

    send_stuff_timer = None  # 定义一个计时器对象
    request_set_lock = threading.Lock()  # 定义一个互斥锁对象
    request_set = set()  # 定义一个集合,用于存储驱动器请求

    def send_stuff():
        """
        发送驱动器请求
        """
        with request_set_lock:
            driver_requests_string = ("[" + ",".join(request_set) + "]").encode("utf-8")  # 将驱动器请求转换为字符串并编码为字节流
            request_set.clear()  # 清空驱动器请求集合
        try:
            with netsim_lock:
                self.send_one_message(netsim_driver_socket, gzip.compress(driver_requests_string))  # 使用网络模拟驱动器套接字发送压缩后的驱动器请求
            if self.config['print_debug']:
                print(f"Wrote driver request to network simulator")  # 打印调试信息(写入驱动器请求到网络模拟器)
        except socket.error as msg:
            print("Error (Driver request transfer): %s" % (msg,))

    while self.run_event.is_set():
        r, __, __ = select.select(phy_driver_sockets, [], [], self.config['responsiveness_timeout'])  # 使用select函数等待可读取的物理驱动器套接字
        if r:
            try:
                for readable_socket in r:
                    with phy_socket_to_lock[readable_socket]:
                        incoming_message_string = self.recv_one_message(readable_socket)  # 接收来自可读取套接字的消息
                    if not incoming_message_string:
                        continue
                    if self.config['print_debug']:
                        print(f"Read driver request")  # 打印调试信息(读取驱动器请求)
                    incoming_message_string = incoming_message_string.decode("utf-8")  # 将收到的消息解码为字符串
                    with request_set_lock:
                        request_set.add(incoming_message_string)  # 将驱动器请求添加到请求集合中
            except socket.error as msg:
                print("Error (Driver request transfer): %s" % (msg,))

            if request_set and (send_stuff_timer is None or not send_stuff_timer.is_alive()):
                send_stuff_timer = threading.Timer(self.config['driver_sync_time'], function=send_stuff)  # 创建一个计时器对象,定时发送驱动器请求
                send_stuff_timer.setDaemon(True)
                send_stuff_timer.start()
        else:
            continue  # 继续下一次循环

1.10 驱动器响应

这段代码的功能是在驱动器响应之间进行传输。主要步骤如下:

  • 创建一个字典mac_to_index,将MAC地址映射到索引,用于快速查找对应的套接字索引。
  • 在主循环中,只要self.run_event为True,就会使用select()函数等待可读取的网络模拟驱动器套接字。
  • 如果有可读取的套接字,则执行以下操作:
    • 使用网络模拟驱动器套接字接收消息。
    • 如果接收到的数据列表为空,则跳过。
    • 如果配置中设置了打印调试信息,打印已从网络模拟器读取驱动器响应的消息。
    • 对解压缩后的数据列表进行遍历,处理每个驱动器响应数据:
      • 获取驱动器响应中的源MAC地址。
      • 如果源MAC地址不存在,则跳过。
      • 根据源MAC地址查找对应的套接字索引。
      • 获取对应的物理驱动器套接字。
      • 使用物理驱动器套接字发送经过JSON编码的驱动器响应数据。
      • 如果配置中设置了打印调试信息,打印已将驱动器响应写入套接字的消息。
  • 如果没有可读取的套接字,则继续下一次循环。

总体而言,这段代码通过监听网络模拟驱动器套接字,接收驱动器响应并将其发送到相应的物理驱动器套接字。它使用了一个映射表来进行源MAC地址的索引查找,以便将驱动器响应发送到正确的物理驱动器套接字。代码还包含了一些调试信息的打印。

def _transfer_driver_responses(self, phy_driver_sockets, phy_socket_locks, netsim_driver_socket, netsim_lock):
    """
    在驱动器响应之间进行传输
    """

    mac_to_index = {mac_i: i for i, mac_i in enumerate(self.config['mac_list'])}  # 创建一个字典,将MAC地址映射到索引

    while self.run_event.is_set():
        r, __, __ = select.select([netsim_driver_socket, ], [], [], self.config['responsiveness_timeout'])  # 使用select函数等待可读取的网络模拟驱动器套接字
        if r:
            try:
                with netsim_lock:
                    data_list = self.recv_one_message(netsim_driver_socket)  # 接收来自网络模拟驱动器套接字的消息
                if not data_list:
                    continue
                if self.config['print_debug']:
                    print(f"Read driver response from network simulator")  # 打印调试信息(从网络模拟器读取驱动器响应)

                for data in json.loads(gzip.decompress(data_list)):  # 解压缩消息并遍历每个驱动器响应数据
                    src_mac = data.get("src_mac", None)  # 获取驱动器响应中的源MAC地址
                    if not src_mac:
                        continue
                    socket_index = mac_to_index[src_mac]  # 根据源MAC地址查找对应的套接字索引
                    appropriate_socket = phy_driver_sockets[socket_index]  # 获取对应的物理驱动器套接字
                    with phy_socket_locks[socket_index]:
                        self.send_one_message(appropriate_socket, json.dumps(data).encode("utf-8"))  # 发送驱动器响应数据到物理驱动器套接字
                    if self.config['print_debug']:
                        print(f"Wrote driver response to socket {socket_index}")  # 打印调试信息(将驱动器响应写入套接字)

            except socket.error as msg:
                print("Error (Driver response transfer): %s" % (msg,))

        else:
            continue  # 继续下一次循环

1.11 驱动器运行

这段代码的功能是运行驱动器进程。主要步骤如下:

  1. 根据配置中的设置,确定使用的套接字类型(UNIX域套接字或IPv4套接字)以及相应的服务器地址。
  2. 创建网络模拟器驱动器套接字对象,并根据需要设置套接字选项。
  3. 尝试连接到网络模拟器驱动器套接字,如果连接失败则返回。
  4. 创建一个线程5. 创建一个线程锁对象,用于保护网络模拟器套接字的访问。
  5. 创建物理驱动器套接字列表,并为每个物理驱动器创建套接字对象。尝试连接到每个物理驱动器套接字,如果连接失败则跳过该物理驱动器。
  6. 创建物理驱动器套接字锁列表,并建立物理驱动器套接字与锁的映射关系。
  7. 创建读取线程,用于在驱动器之间传输请求。
  8. 创建写入线程,用于在驱动器之间传输响应。
  9. 启动读取线程和写入线程。
  10. 等待读取线程和写入线程结束。

总体而言,这段代码的目的是在驱动器之间建立连接,并在不同线程中进行请求和响应的传输。

def _run_driver_process(self, num_interfaces):
    """
    运行驱动器进程
    """

    if self.net_use_uds:
        server_address = self.config['net_driver_uds_server_address']  # 获取网络驱动器的UNIX域套接字地址
        netsim_driver_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)  # 创建UNIX域套接字对象
    else:
        server_address = (self.config['netsim_ip_server_address'], self.config['netsim_ip_ranging_port'])  # 获取网络模拟器的IP地址和端口号
        netsim_driver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建IPv4套接字对象
        netsim_driver_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # 设置套接字选项,禁用Nagle算法
        netsim_driver_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)  # 设置套接字选项,启用快速ACK

    try:
        if self.try_connecting(netsim_driver_socket, server_address, "Error (Network simulator driver client):") == -1:
            return
        netsim_lock = threading.Lock()  # 创建一个线程锁对象,用于保护网络模拟器套接字的访问

        phy_driver_sockets = []  # 物理驱动器套接字列表
        phy_socket_locks = []  # 物理驱动器套接字锁列表
        phy_socket_to_lock = {}  # 物理驱动器套接字到锁的映射字典
        for i in range(num_interfaces):
            phy_socket_server_address = self.config['phy_driver_uds_server_address'] + str(i)  # 获取物理驱动器的UNIX域套接字地址
            phy_driver_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)  # 创建UNIX域套接字对象
            if self.try_connecting(phy_driver_socket, phy_socket_server_address, f"Error (Physics coordinator driver client {i}):") == -1:
                continue
            phy_driver_sockets.append(phy_driver_socket)  # 将物理驱动器套接字添加到列表中
            phy_socket_lock = threading.Lock()  # 创建线程锁对象,用于保护物理驱动器套接字的访问
            phy_socket_locks.append(phy_socket_lock)  # 将物理驱动器套接字锁添加到列表中
            phy_socket_to_lock[phy_driver_socket] = phy_socket_lock  # 将物理驱动器套接字与锁的映射关系添加到字典中
    except socket.error as msg:
        if str(msg).strip():
            print("Error (driver process): %s" % (msg,))
        return

    read_thread = threading.Thread(target=self._transfer_driver_requests, args=(
        phy_driver_sockets, phy_socket_to_lock, netsim_driver_socket, netsim_lock))  # 创建读取线程,用于传输驱动器请求
    write_thread = threading.Thread(target=self._transfer_driver_responses, args=(
        phy_driver_sockets, phy_socket_locks, netsim_driver_socket, netsim_lock))  # 创建写入线程,用于传输驱动器响应

    read_thread.start()  # 启动读取线程
    write_thread.start()  # 启动写入线程

    read_thread.join()  # 等待读取线程结束
    write_thread.join()  # 等待写入线程结束

1.12 连接尝试

这段代码的功能是在指定的套接字和地址之间尝试建立连接,并返回连接结果。具体步骤如下:

  1. 初始化连接尝试次数计数器为0。
  2. 在循环中,尝试通过套接字的connect方法连接到指定的地址。
  3. 如果连接成功,跳出循环。
  4. 如果连接失败(捕获到socket.error异常),将连接尝试次数计数器加1,并等待2秒后再进行下一次尝试。
  5. 如果连接尝试次数超过100次,打印连接失败的错误信息。
  6. 返回连接失败的标识(-1)。
  7. 返回连接成功的标识(0)。

该方法的作用是在连接建立之前进行多次尝试,以增加连接成功的概率,并提供一定的错误处理。

@staticmethod
def try_connecting(sock, address, tag):
    """
    尝试在给定的套接字和地址之间建立连接
    """

    i = 0  # 连接尝试次数计数器
    while i < 100:  # 最多尝试100次
        try:
            sock.connect(address)  # 尝试连接套接字和地址
            break  # 如果连接成功,跳出循环
        except socket.error:  # 捕获套接字错误异常
            i += 1  # 连接尝试次数加1
            time.sleep(2)  # 等待2秒后再进行下一次尝试
    else:
        print(tag + " Could not connect")  # 如果连接尝试次数超过100次,打印连接失败的错误信息
        return -1  # 返回连接失败的标识
    return 0  # 返回连接成功的标识

1.13 数据包翻转

这段代码的功能是生成数据包中需要进行位翻转的位置。具体步骤如下:

  1. 初始化当前字节位置为0。
  2. 在循环中,生成满足指定伯努利分布的随机数,表示到下一个错误位的距离。
  3. 将距离转换为字节和位,其中space_to_next_error_byte表示距离下一个错误位的字节数,bit_to_flip表示需要翻转的位。
  4. 更新当前字节位置,将其增加space_to_next_error_byte。
  5. 如果当前字节位置仍然小于数据包长度,则生成当前字节和位翻转的位置。
  6. 重复步骤2到步骤5,直到当前字节位置不小于数据包长度。

通过生成器(yield语句),该方法可以按需逐步生成数据包中需要翻转的位的位置。

@staticmethod
def _get_flip_locations(ber, packet_length):
    """
    生成数据包中翻转位的位置
    """

    current_byte = 0  # 当前字节位置
    while current_byte < packet_length:  # 当前字节位置小于数据包长度时进行循环
        space_to_next_error_bit = np.random.geometric(p=ber)  # 生成满足指定伯努利分布的随机数,表示到下一个错误位的距离
        space_to_next_error_byte, bit_to_flip = divmod(space_to_next_error_bit, 8)  # 将到下一个错误位的距离转换为字节和位
        current_byte = current_byte + space_to_next_error_byte  # 更新当前字节位置
        if current_byte < packet_length:  # 如果当前字节位置仍然小于数据包长度
            yield current_byte, bit_to_flip  # 生成当前字节和位翻转的位置

1.14 数据发送

这段代码的功能是将数据发送到指定的套接字。具体步骤如下:

  1. 获取数据的长度,使用len(data)方法获得数据的字节数。
  2. 使用struct.pack('!I', length)将数据的长度打包为4字节的无符号整数。'!I'表示使用网络字节序(big-endian)对无符号整数进行打包。
  3. 将打包后的数据长度和数据本身进行拼接,得到完整的消息。
  4. 使用套接字的sendall()方法将消息发送到套接字。

通过这段代码,可以将数据按照特定的格式发送到指定的套接字中。在发送消息时,先发送一个4字节的无符号整数表示消息的长度,然后再发送消息的内容。

@staticmethod
def send_one_message(sock, data):
    """
    将数据发送到套接字
    """

    length = len(data)  # 获取数据的长度
    sock.sendall(struct.pack('!I', length) + data)  # 将数据的长度打包为4字节的无符号整数,并与数据本身进行拼接后发送到套接字

1.15 数据接收

这段代码的功能是从套接字接收一条完整的消息。具体步骤如下:

  1. 调用类方法recvall(sock, 4),从套接字接收4字节的数据作为消息的长度。这里使用了类方法recvall来确保接收到指定长度的数据。
  2. 如果接收到的长度数据为空(表示连接已关闭或出错),则返回None。
  3. 使用struct.unpack('!I', lengthbuf)将接收到的长度数据解包为一个无符号整数。'!I'表示使用网络字节序(big-endian)进行解包。
  4. 调用类方法recvall(sock, length),从套接字接收指定长度的消息数据。同样,这里使用了类方法recvall来确保接收到指定长度的数据。
  5. 返回接收到的完整消息数据。

通过这段代码,可以方便地从套接字中接收完整的消息,首先接收4字节的长度信息,然后根据长度信息接收相应长度的消息数据。

@classmethod
def recv_one_message(cls, sock):
    """
    从套接字接收一条消息
    """

    lengthbuf = cls.recvall(sock, 4)  # 调用类方法recvall,从套接字接收4字节的数据作为消息长度
    if not lengthbuf:  # 如果接收到的长度数据为空(表示连接关闭或出错),返回None
        return None
    length, = struct.unpack('!I', lengthbuf)  # 将接收到的长度数据解包为一个无符号整数,使用网络字节序(big-endian)
    return cls.recvall(sock, length)  # 调用类方法recvall,从套接字接收指定长度的消息数据

1.16 recvall

这段代码的功能是从套接字接收指定数量的数据。具体步骤如下:

  1. 创建一个字节数组buf,用于存储接收到的数据。
  2. 在循环中,当还需要接收的数据数量count不为0时,继续接收数据。
  3. 使用sock.recv(count)从套接字接收指定数量的数据,并将其存储在newbuf中。
  4. 如果接收到的数据newbuf为空(表示连接已关闭或出错),则返回None。
  5. 将接收到的数据newbuf追加到存储数据的字节数组buf中。
  6. 更新还需要接收的数据数量count,减去已接收数据的长度len(newbuf)。
  7. 重复步骤2到步骤6,直到接收到指定数量的数据。
  8. 返回接收到的完整数据buf。

通过这段代码,可以确保从套接字中接收到指定数量的完整数据。它在循环中多次调用sock.recv()方法,直到接收到所需的数据量为止,并将接收到的数据存储在字节数组中,然后将其作为结果返回。

@staticmethod
def recvall(sock, count):
    """
    从套接字接收指定数量的数据
    """

    buf = bytearray()  # 创建一个字节数组用于存储接收到的数据
    while count:  # 当还需要接收的数据数量不为0时进行循环
        newbuf = sock.recv(count)  # 从套接字接收指定数量的数据,根据前面,默认传入4个字节
        if not newbuf:  # 如果接收到的数据为空(表示连接已关闭或出错),返回None
            return None
        buf += newbuf  # 将接收到的数据追加到存储数据的字节数组中
        count -= len(newbuf)  # 更新还需要接收的数据数量
    return buf  # 返回接收到的完整数据

1.17 网络协调器运行

这段代码的功能是运行网络协调器。具体步骤如下:

  1. 使用self._setup_network()方法设置网络并获取IP地址列表。
  2. 获取IP地址列表的长度,存储在num_ips变量中。
  3. 定义一些打开TUN设备所需的常量。
  4. 创建一个空列表tuns用于存储打开的TUN设备。
  5. 创建一个空列表tun_threads用于存储TUN线程。
  6. 初始化phy_protobuf_thread和netsim_protobuf_thread为None。
  7. 初始化driver_process为None。
  8. 设置运行事件标志self.run_event为True。
  9. 在try块中执行以下操作:
    • 将IP地址列表转换为字典ip_to_tun_map,其中IP地址映射到TUN设备的索引。
    • 使用open('/dev/net/tun', 'r+b', buffering=0)打开TUN设备,并将其添加到tuns列表中。
    • 使用fcntl.ioctl设置T上部分回答被截断了,以下是继续的回答:
    • 使用fcntl.ioctl设置TUN设备的参数。
    • 启动TUN线程,每个线程读取特定的TUN设备。
    • 启动物理层protobuf线程phy_protobuf_thread和网络仿真protobuf线程netsim_protobuf_thread。
    • 如果配置中do_driver_transfer为True,则启动驱动进程driver_process。
    • 在主线程中阻塞,等待用户输入来停止程序。
    • 如果收到用户输入,抛出KeyboardInterrupt异常来退出程序。
  10. 如果捕获到KeyboardInterrupt异常,输出信息并清除运行事件标志。
  11. 等待所有TUN线程、物理层protobuf线程、网络仿真protobuf线程和驱动进程结束。
  12. 最后,调用self._remove_network(num_ips)方法移除网络。

请注意,这段代码是一个方法的片段,它使用了一些类成员变量和方法,其中的具体实现细节无法确定,因此无法提供完整的功能描述。

def run_network_coordinator(self):
    """
    运行网络协调器
    """

    ip_list = self._setup_network()  # 设置网络并返回IP地址列表
    num_ips = len(ip_list)  # 获取IP地址列表的长度

    # 打开TUN设备的常量
    TUNSETIFF = 0x400454ca
    TUNSETOWNER = TUNSETIFF + 2
    IFF_TUN = 0x0001
    IFF_NO_PI = 0x1000

    tuns = []  # 存储打开的TUN设备
    tun_threads = []  # 存储TUN线程
    phy_protobuf_thread = None  # 物理层protobuf线程
    netsim_protobuf_thread = None  # 网络仿真protobuf线程
    driver_process = None  # 驱动进程

    self.run_event.set()  # 设置运行事件标志

    try:
        # 读取IP地址
        ip_to_tun_map = {int(ipaddress.IPv4Address(ip_i)): i for i, ip_i in enumerate(ip_list)}

        # 打开TUN设备
        for i in range(num_ips):
            tuns.append(open('/dev/net/tun', 'r+b', buffering=0))
            ifri = struct.pack('16sH', b'tun' + str(i).encode('ascii'), IFF_TUN | IFF_NO_PI)
            fcntl.ioctl(tuns[i], TUNSETIFF, ifri)
            fcntl.ioctl(tuns[i], TUNSETOWNER, 1000)

        # 启动TUN线程
        for i in range(num_ips):
            tun_threads.append(threading.Thread(target=self._read_from_tuns, args=(i, tuns,)))
            tun_threads[i].start()

        # 启动protobuf线程
        phy_protobuf_thread = threading.Thread(target=self._run_protobuf_client_phy_coord, args=())
        netsim_protobuf_thread = threading.Thread(target=self._run_protobuf_client_net_sim,
                                                  args=(tuns, ip_to_tun_map))
        phy_protobuf_thread.start()
        netsim_protobuf_thread.start()

        # 启动驱动进程
        if self.config['do_driver_transfer']:
            driver_process = multiprocessing.Process(target=self._run_driver_process, args=(num_ips,))
            driver_process.start()

        # 阻塞主线程
        print("输入任何内容停止\n")
        while self.run_event.is_set():
            r, __, __ = select.select([sys.stdin, ], [], [], self.config['responsiveness_timeout'])
            if r:
                raise KeyboardInterrupt
            else:
                continue
        raise KeyboardInterrupt

    except KeyboardInterrupt:
        print("尝试关闭线程")
        self.run_event.clear()
        if tun_threads:
            for i in tun_threads:
                i.join()
        if phy_protobuf_thread:
            phy_protobuf_thread.join()
        if netsim_protobuf_thread:
            netsim_protobuf_thread.join()
        if driver_process:
            driver_process.join()
        print("线程成功关闭")
    finally:
        self._remove_network(num_ips)  # 移除网络

2.AtomicCounter类

这段代码定义了一个原子计数器类AtomicCounter,它具有以下功能:

  • init(self, initial=0):类的构造函数,用于初始化原子计数器对象。可以传入一个初始值initial,默认为0。
  • increment(self, num=1):原子地将计数器的值增加num(默认为1),并返回增加后的新值。该方法使用了线程锁_lock,以确保在多线程环境下对计数器的操作是原子的。

请注意,原子计数器类的目的是在多线程环境中实现线程安全的计数操作,以避免竞态条件和数据不一致的问题。

class AtomicCounter:
    def __init__(self, initial=0):
        """初始化一个新的原子计数器,初始值为给定的initial(默认为0)"""
        self.value = initial  # 计数器的值
        self._lock = threading.Lock()  # 用于实现原子操作的锁对象

    def increment(self, num=1):
        """原子地将计数器增加num(默认为1),并返回新的值"""
        with self._lock:  # 使用锁对象保证原子操作
            self.value += num  # 增加计数器的值
        return self.value  # 返回新的计数器值

3. 主函数

这段代码的主要功能是作为程序的入口点,并调用main函数来运行网络协调器。具体步骤如下:

  • main(args)函数是程序的入口函数,它接受一个参数args,表示命令行参数。
  • 如果命令行参数的数量不等于2,即不满足len(args) != 2条件,打印使用说明。
  • 如果命令行参数的数量等于2,即满足len(args) == 2条件,说明提供了一个配置文件路径作为参数。
    • 创建一个NetworkCoordinator对象,传入配置文件路径作为参数,即NetworkCoordinator(args[1])。
    • 调用network_coordinator对象的run_network_coordinator()方法,运行网络协调器的功能。
  • return 0表示程序正常退出并返回退出码0。

然后,使用if name == 'main':来判断当前脚本是否作为主程序执行。如果是主程序执行,则调用main(sys.argv)来运行主函数,并使用sys.exit()退出程序,返回main函数的返回值。

该代码的作用是在命令行中运行网络协调器,并根据命令行参数指定的配置文件来配置和控制网络协调器的行为。

def main(args):
    if len(args) != 2:
        print("usage: network_coordinator.py <config_file>")
    else:
        network_coordinator = NetworkCoordinator(args[1])  # 创建一个网络协调器对象,传入配置文件路径作为参数
        network_coordinator.run_network_coordinator()  # 运行网络协调器的功能
    return 0


if __name__ == '__main__':
    sys.exit(main(sys.argv))  # 执行main函数并将命令行参数传递给它,然后使用sys.exit退出程序,返回main函数的返回值
0

评论区