目 录CONTENT

文章目录

phy_sim_dummy

Rho
Rho
2023-08-19 / 0 评论 / 0 点赞 / 30 阅读 / 8900 字
一个实现物理模拟器的 Python 脚本。它定义了几个函数和脚本的主入口点。

下面是代码的分解:

导入:脚本导入了各种模块和包,这些模块和包对其功能至关重要,包括 gzip、json、multiprocessing、os、random、socket、subprocess、threading、time、numpy、select、pyquaternion 和 yaml。这些模块提供了网络通信、数据压缩、进程管理等功能。

run_protobuf_server(config):此函数负责运行 protobuf 服务器。它创建一个套接字,监听传入连接,并与客户端进行通信。它接收数据,使用 protobuf 进行解析,生成响应,并将其发送回客户端。

parse_request(req):此函数以请求(protobuf 数据)为输入,并使用 phyud.PhysicsUpdate protobuf 消息进行解析。

gen_response(time_update, num_nodes):此函数根据解析的请求和提供的节点数生成响应(protobuf 数据)。它使用 phyud.PhysicsUpdate protobuf 消息构建响应。

generate_data(num_nodes):此函数为给定的节点数生成随机的信道数据。它创建 cd.ChannelData protobuf 消息的实例,并使用随机值填充。

driver_process(config):此函数表示驱动程序进程。它连接到物理模拟器服务器,发送驱动程序请求,并接收响应。它使用套接字进行通信,并在单独的线程中运行。

main(args):这是脚本的主入口点。它读取配置文件,如果指定,则启动驱动程序进程,并运行 protobuf 服务器。

if name == 'main':这个代码块确保只有当脚本直接运行时才执行主函数,而不是作为模块导入时执行。

总体而言,该脚本建立了一个服务器-客户端架构,其中服务器运行物理模拟器,客户端(驱动程序进程)通过发送请求和接收响应与服务器交互。服务器和客户端之间的通信是使用 protobuf 消息进行的。

import gzip  # 导入gzip模块,用于数据压缩
import json  # 导入json模块,用于处理JSON数据
import multiprocessing  # 导入multiprocessing模块,用于多进程处理
import os  # 导入os模块,用于操作系统相关功能
import random  # 导入random模块,用于生成随机数
import socket  # 导入socket模块,用于网络通信
import subprocess  # 导入subprocess模块,用于执行子进程
import threading  # 导入threading模块,用于多线程处理
import time  # 导入time模块,用于时间相关操作

import numpy as np  # 导入numpy模块,用于数值计算
import select  # 导入select模块,用于监视文件对象状态
from pyquaternion import Quaternion  # 导入pyquaternion模块,用于四元数计算
import yaml  # 导入yaml模块,用于处理YAML配置文件

import protobuf_msgs.channel_data_pb2 as cd  # 导入channel_data_pb2模块,protobuf消息定义
import protobuf_msgs.physics_update_pb2 as phyud  # 导入physics_update_pb2模块,protobuf消息定义
from network_coordinator import NetworkCoordinator  # 导入NetworkCoordinator模块


def run_protobuf_server(config):
    # 运行protobuf服务器
    if config['phy_use_uds']:
        server_address = config['phy_uds_server_address']
        # 确保套接字文件不存在
        try:
            os.unlink(server_address)
        except OSError:
            if os.path.exists(server_address):
                raise
        # 创建UNIX域套接字
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    else:
        server_address = (config['phy_ip_server_address'], config['phy_ip_server_port'])
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)

    sock.bind(server_address)  # 绑定服务器地址

    try:
        prev_time = 0
        print("Hit Ctrl-c to exit")
        sock.listen(1)  # 监听连接请求
        connection, client_address = sock.accept()  # 接受客户端连接
        while True:
            try:
                num_nodes = len(config['mac_list'])  # 获取节点数量
                data = gzip.decompress(NetworkCoordinator.recv_one_message(connection))  # 接收并解压数据
                data = gzip.compress(gen_response(parse_request(data), num_nodes))  # 生成并压缩响应数据
                cur_time = time.time()  # 当前时间
                if cur_time - prev_time > 2:  # 每隔2秒打印通道数据
                    prev_time = time.time()
                    time_update = phyud.PhysicsUpdate()
                    time_update.ParseFromString(gzip.decompress(data))
                    channel_data = cd.ChannelData()
                    channel_data.ParseFromString(gzip.decompress(time_update.channel_data))
                    print(channel_data)

                NetworkCoordinator.send_one_message(connection, data)  # 发送响应数据
            except socket.error:
                raise KeyboardInterrupt

    except KeyboardInterrupt:
        print("\nExiting physics simulator dummy main process")

    finally:
        sock.close()  # 关闭套接字
        if config['phy_use_uds']:
            os.unlink(server_address)  # 删除UNIX域套接字文件

def parse_request(req):
    time_update = phyud.PhysicsUpdate()
    time_update.ParseFromString(req)
    return time_update  # 解析请求数据

def gen_response(time_update, num_nodes):
    time_update.msg_type = phyud.PhysicsUpdate.END  # 设置消息类型为END
    data = generate_data(num_nodes)  # 生成通道数据
    time_update.channel_data = gzip.compress(data)  # 压缩通道数据
    return time_update.SerializeToString()  # 序列化响应数据

def generate_data(num_nodes):
    channel_data = cd.ChannelData()  # 创建 ChannelData 对象

    for __ in range(num_nodes):  # 根据节点数量生成随机数据
        channel_data.node_list.extend([float(20 * np.random.ranf((1, 1)))])  # 生成随机浮点数并添加到节点列表中
        channel_data.node_list.extend([float(20 * np.random.ranf((1, 1)))])
        channel_data.node_list.extend([float(20 * np.random.ranf((1, 1)))])
        channel_data.node_list.extend(Quaternion.random())  # 生成随机四元数并添加到节点列表中

    los = 0
    other = 0

    for i in range(random.randint(1, 2)):  # 随机选择节点对
        path_details = channel_data.path_details.add()
        a, b = random.sample((range(1, num_nodes + 1)), k=2)  # 从节点集合中随机选择两个节点

        path_details.ids.extend([a, b])  # 将节点对的ID添加到路径细节中
        path_details.los = True  # 设置路径为LOS(Line of Sight)
        los += int(path_details.los)

        for j in range(random.randint(0, 3)):  # 随机选择路径数量
            k = random.randint(1, 3)  # 随机选择路径中的跳数
            path_details.num_hops.append(k)  # 将跳数添加到路径细节中
            path_details.hop_points.extend(np.random.rand(1, (4 * k)).tolist()[0])  # 生成随机浮点数并添加到路径细节中
            other += 1

    return channel_data.SerializeToString()  # 将 ChannelData 对象序列化为字符串并返回


def driver_process(config):
    addresses = []  # 存储地址的列表
    socket_list = []  # 存储套接字的列表
    try:
        addresses = [config['phy_driver_uds_server_address'] + str(i) for i, mac_i in enumerate(config['mac_list'])]  # 根据配置信息生成地址列表

        def connect(id, addr):
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)  # 创建 Unix 域套接字
            sock.bind(addr)  # 绑定地址
            socket_list.append(sock)  # 将套接字添加到列表中
            sock.listen(1)  # 监听连接请求
            connection, _ = sock.accept()  # 接受连接请求并返回连接对象和地址
            print(f"Connected to {addr}")
            if id == 1: time.sleep(0.5)  # 如果是第二个连接,则延迟0.5秒
            mac_tuple = ("1a", "2a")
            src_mac, dst_mac = mac_tuple if id == 0 else mac_tuple[::-1]  # 根据连接的序号选择源MAC地址和目标MAC地址

            request = '''{"type": "driver_request","src_mac": "%s","dst_mac": "%s"}''' % (src_mac, dst_mac)  # 创建请求消息

            try:
                while True:
                    print(f"Sent: " + (request))  # 打印发送的消息
                    NetworkCoordinator.send_one_message(connection, request.encode("utf-8"))  # 发送消息
                    r, __, __ = select.select([connection, ], [], [], 0)  # 检查是否有可读取的数据
                    if r: print(f"Received: " + (NetworkCoordinator.recv_one_message(connection)).decode("utf-8"))  # 如果有数据可读取,则打印接收的消息
                    time.sleep(5)  # 延迟5秒
            except socket.error:
                return

        threads = []  # 存储线程的列表
        id = 0
        for a in addresses:
            thread = threading.Thread(target=connect, args=(id, a,))  # 创建线程,并指定连接函数和参数
            thread.setDaemon(True)  # 设置线程为守护线程
            thread.start()  # 启动线程
            threads.append(thread)  # 将线程添加到列表中
            id += 1
        for t in threads: t.join()  # 等待所有线程结束

    finally:
        print("\nExiting physics simulator dummy driver process")  # 打印退出消息
        for sock in socket_list: sock.close()  # 关闭所有套接字
        for address in addresses: os.unlink(address)  # 删除所有地址文件


def main(args):
    if len(args) != 2:
        print("usage: phy_sim_dummy.py <config_file>")  # 打印用法信息
    else:
        with open(args[1]) as f:
            config = yaml.load(f, Loader=yaml.FullLoader)  # 加载配置文件
            if config['do_driver_transfer']:
                ranging_process = multiprocessing.Process(target=driver_process, args=(config,))  # 创建子进程并指定驱动程序函数和参数
                ranging_process.start()  # 启动子进程
            run_protobuf_server(config)  # 运行协议缓冲区服务器功能(未提供代码)
            if config['do_driver_transfer']:
                ranging_process.join()  # 等待子进程结束
    return 0  # 返回值为0

# 程序入口
if __name__ == "__main__":
    import sys
    sys.exit(main(sys.argv))
0

评论区