目 录CONTENT

文章目录

net_sim_dummy

Rho
Rho
2023-08-18 / 0 评论 / 0 点赞 / 39 阅读 / 10037 字
定义了与网络模拟器相关的几个函数。这里是每个函数的功能解析:

run_protobuf_server(config):这个函数设置一个服务器来接收以protobuf消息形式的网络更新。它创建一个socket,将其绑定到指定的服务器地址上并监听传入连接。一旦建立连接,它接收数据,解析请求,生成响应,使用gzip进行压缩,然后将其发送回客户端。

parse_request(req):这个函数以一个请求(protobuf消息)为输入,使用NetworkUpdate协议缓冲区进行解析,并返回解析后的NetworkUpdate对象。它还调用parse_phy_message函数来解析请求中的任何信道数据。

gen_response(time_update, ip_list):这个函数以一个NetworkUpdate对象和一个IP地址列表为输入。它修改NetworkUpdate对象,将消息类型设置为"END",将目标IP地址列表中的数据包延迟500ms,并向ber和rx_ip字段附加一些数据。最后,它将更新后的NetworkUpdate对象序列化为字符串并返回。

parse_phy_message(data):这个函数以信道数据为输入,使用ChannelData协议缓冲区进行解析,并提取LOS(视野)和路径等信息。

driver_process(config):这个函数设置一个单独的服务器来处理驱动程序请求。它创建一个socket,将其绑定到指定的服务器地址上并监听传入连接。一旦建立连接,它接收驱动程序请求,处理它们,并发送驱动程序响应。

请注意,代码中缺少导入语句和一些必要的依赖项,例如NetworkCoordinator类的定义和select模块的导入。此外,代码中有一些部分是不完整的或者缺少实现(parse_phy_message函数没有返回任何内容,ip_list也没有定义)。

import gzip  # 导入gzip模块,用于数据压缩
import json  # 导入json模块,用于处理JSON数据
import multiprocessing  # 导入multiprocessing模块,用于实现多进程
import os  # 导入os模块,用于操作系统相关的功能
import socket  # 导入socket模块,用于网络通信
import struct  # 导入struct模块,用于处理二进制数据的打包和解包
import time  # 导入time模块,用于时间相关操作

import numpy as np  # 导入numpy模块,用于数值计算
import select  # 导入select模块,用于实现非阻塞I/O
import yaml  # 导入yaml模块,用于处理YAML数据

import protobuf_msgs.channel_data_pb2 as cd  # 导入channel_data_pb2模块,用于处理protobuf消息
import protobuf_msgs.network_update_pb2 as netud  # 导入network_update_pb2模块,用于处理protobuf消息
from network_coordinator import NetworkCoordinator  # 导入NetworkCoordinator类


def run_protobuf_server(config):
    # 如果配置中指定使用UNIX域套接字(UDS),则使用UDS作为服务器地址
    if config['net_use_uds']:
        server_address = config['netsim_uds_server_address']
        # 确保套接字文件不存在
        try:
            os.unlink(server_address)
        except OSError:
            if os.path.exists(server_address):
                raise

        # 创建UNIX域套接字
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    else:
        # 使用IP地址和端口作为服务器地址
        server_address = (config['netsim_ip_server_address'], config['netsim_ip_server_port'])
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)

    # 将套接字绑定到服务器地址
    sock.bind(server_address)

    try:
        print("Hit Ctrl-c to exit")
        sock.listen(1)  # 监听连接请求
        connection, client_address = sock.accept()  # 接受客户端连接
        while True:
            try:
                # 接收数据
                data = NetworkCoordinator.recv_one_message(connection)
                # 解压缩接收到的数据并解析请求
                data = gzip.compress(gen_response(parse_request(gzip.decompress(data)), config['ip_list']))
                # 发送响应数据
                NetworkCoordinator.send_one_message(connection, data)
            except socket.error:
                raise KeyboardInterrupt

    except KeyboardInterrupt:
        print("\nExiting network simulator dummy main process")

    finally:
        sock.close()  # 关闭套接字
        if config['net_use_uds']:
            os.unlink(server_address)


def parse_request(req):
    # 创建NetworkUpdate对象,用于解析请求
    time_update = netud.NetworkUpdate()
    # 使用protobuf解析请求数据
    time_update.ParseFromString(req)

    if time_update.channel_data:
        parse_phy_message(gzip.decompress(time_update.channel_data))
    time_update.channel_data = b""
    return time_update


def gen_response(time_update, ip_list):
    # 设置消息类型为"END"
    time_update.msg_type = netud.NetworkUpdate.END
    for i, id_i in enumerate(time_update.pkt_id):
        ip_dst = socket.inet_ntoa(struct.pack('!L', time_update.dst_ip[i]))
        if ip_dst in ip_list:
            print("Packet captured. Destination: %s. Delaying for 500ms." % ip_dst)
            time.sleep(0.5)  # 延迟500毫秒
        time_update.ber.append(1e-9)
        time_update.rx_ip.append(time_update.dst_ip[i])  # 不处理广播
    del time_update.pkt_lengths[:]
    return time_update.SerializeToString()


def parse_request(req):
    time_update = netud.NetworkUpdate()  # 创建NetworkUpdate对象
    time_update.ParseFromString(req)  # 从req中解析数据填充到time_update对象

    if time_update.channel_data:
        parse_phy_message(gzip.decompress(time_update.channel_data))  # 解压缩channel_data并调用parse_phy_message函数处理
    time_update.channel_data = b""  # 清空channel_data字段
    return time_update  # 返回处理后的time_update对象


def gen_response(time_update, ip_list):
    time_update.msg_type = netud.NetworkUpdate.END  # 设置msg_type字段为END
    for i, id_i in enumerate(time_update.pkt_id):
        ip_dst = socket.inet_ntoa(struct.pack('!L', time_update.dst_ip[i]))  # 将dst_ip转换为IP地址形式
        if ip_dst in ip_list:
            print("Packet captured. Destination: %s. Delaying for 500ms." % ip_dst)  # 打印捕获的数据包目的地并延迟500毫秒
            time.sleep(0.5)  # 延迟500毫秒
        time_update.ber.append(1e-9)  # 将1e-9追加到ber列表
        time_update.rx_ip.append(time_update.dst_ip[i])  # 将dst_ip[i]追加到rx_ip列表(不处理广播)
    del time_update.pkt_lengths[:]  # 清空pkt_lengths列表
    return time_update.SerializeToString()  # 将time_update对象序列化为字符串并返回


def parse_phy_message(data):
    channel_data = cd.ChannelData()  # 创建ChannelData对象
    channel_data.ParseFromString(data)  # 从data中解析数据填充到channel_data对象
    num_nodes = int(len(channel_data.node_list) / 7)  # 计算节点数量
    nodes = np.array(channel_data.node_list).reshape((num_nodes, 7))  # 将node_list转换为二维数组形式

    los = 0  # 可见光直射路径数量
    other = 0  # 其他路径数量

    if len(channel_data.path_details):
        path_details = {}
        for path_detail in channel_data.path_details:  # 遍历每对节点的路径细节
            path_id = tuple(path_detail.ids)  # 将ids转换为元组形式作为路径ID
            if not path_detail or not path_id:
                continue
            path_details[path_id] = {'los': path_detail.los}  # 将路径ID和los存储到path_details字典中
            los += int(path_detail.los)  # 增加los计数

            hop_points_index = 0
            for num_hops_in_path in path_detail.num_hops:  # 遍历给定节点的每条路径
                points_in_path = path_detail.hop_points[hop_points_index: hop_points_index + (num_hops_in_path * 4)]  # 获取路径中的点
                path_details[path_id].setdefault(num_hops_in_path, []).extend(points_in_path)  # 将路径中的点存储到path_details字典中

                hop_points_index += (num_hops_in_path * 4)
                other += 1

    else:
        path_details = None  # 如果没有路径细节,则将path_details设置为None


def driver_process(config):
    if config['net_use_uds']:
        server_address = config['net_driver_uds_server_address']
        try:
            os.unlink(server_address)
        except OSError:
            if os.path.exists(server_address):
                raise

        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)  # 创建UNIX域套接字
    else:
        server_address = (config['netsim_ip_server_address'], config['netsim_ip_ranging_port'])
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建IPv4套接字
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)  # 设置TCP_NODELAY选项为1
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)  # 设置TCP_QUICKACK选项为1

    try:
        sock.bind(server_address)  # 绑定套接字到服务器地址
        sock.listen(1)  # 监听连接请求
       driver_connection, _ = sock.accept()  # 接受客户端连接请求
        while True:
            r, __, __ = select.select((driver_connection,), [], [], 2)  # 使用select函数等待套接字可读
            if not r:
                continue
            try:
                driver_requests = NetworkCoordinator.recv_one_message(driver_connection)  # 从driver_connection接收消息
                if not driver_requests:
                    break
                driver_requests = json.loads(gzip.decompress(driver_requests))  # 解压缩并解析driver_requests

                driver_responses = []
                for request in driver_requests:
                    if request.get("type", None) == "driver_request":
                        src_mac = request.get("src_mac", None)
                        dst_mac = request.get("dst_mac", None) if src_mac else None
                        if dst_mac:
                            request["type"] = "driver_reply"
                            driver_responses.append(request)
                if driver_responses:
                    NetworkCoordinator.send_one_message(driver_connection,
                                          gzip.compress(json.dumps(driver_responses).encode("utf-8")))  # 压缩并发送driver_responses
            except socket.error:
                return
    finally:
        print("\nExiting network simulator dummy driver process")
        sock.close()
        if config['net_use_uds']:
            os.unlink(server_address)  # 删除UNIX域套接字文件

def main(args):
    if len(args) != 2:
        print("usage: net_sim_dummy.py <config_file>")  # 打印用法信息
    else:
        with open(args[1]) as f:
            config = yaml.load(f, Loader=yaml.FullLoader)  # 加载配置文件数据
            if config['do_driver_transfer']:
                ranging_process = multiprocessing.Process(target=driver_process, args=(config,))
                ranging_process.start()  # 启动进程
            run_protobuf_server(config)  # 运行协议缓冲区服务器
            if config['do_driver_transfer']:
                ranging_process.join()  # 等待进程结束
    return 0


if __name__ == '__main__':
    import sys

    sys.exit(main(sys.argv))
0

评论区