下面是代码的分解:
导入:脚本导入了各种模块和包,这些模块和包对其功能至关重要,包括 gzip、json、multiprocessing、os、random、socket、subprocess、threading、time、numpy、select、pyquaternion 和 yaml。这些模块提供了网络通信、数据压缩、进程管理等功能。
run_protobuf_server(config):此函数负责运行 protobuf 服务器。它创建一个套接字,监听传入连接,并与客户端进行通信。它接收数据,使用 protobuf 进行解析,生成响应,并将其发送回客户端。
parse_request(req):此函数以请求(protobuf 数据)为输入,并使用 phyud.PhysicsUpdate protobuf 消息进行解析。
gen_response(time_update, num_nodes):此函数根据解析的请求和提供的节点数生成响应(protobuf 数据)。它使用 phyud.PhysicsUpdate protobuf 消息构建响应。
generate_data(num_nodes):此函数为给定的节点数生成随机的信道数据。它创建 cd.ChannelData protobuf 消息的实例,并使用随机值填充。
driver_process(config):此函数表示驱动程序进程。它连接到物理模拟器服务器,发送驱动程序请求,并接收响应。它使用套接字进行通信,并在单独的线程中运行。
main(args):这是脚本的主入口点。它读取配置文件,如果指定,则启动驱动程序进程,并运行 protobuf 服务器。
if name == 'main':这个代码块确保只有当脚本直接运行时才执行主函数,而不是作为模块导入时执行。
总体而言,该脚本建立了一个服务器-客户端架构,其中服务器运行物理模拟器,客户端(驱动程序进程)通过发送请求和接收响应与服务器交互。服务器和客户端之间的通信是使用 protobuf 消息进行的。
import gzip # 导入gzip模块,用于数据压缩
import json # 导入json模块,用于处理JSON数据
import multiprocessing # 导入multiprocessing模块,用于多进程处理
import os # 导入os模块,用于操作系统相关功能
import random # 导入random模块,用于生成随机数
import socket # 导入socket模块,用于网络通信
import subprocess # 导入subprocess模块,用于执行子进程
import threading # 导入threading模块,用于多线程处理
import time # 导入time模块,用于时间相关操作
import numpy as np # 导入numpy模块,用于数值计算
import select # 导入select模块,用于监视文件对象状态
from pyquaternion import Quaternion # 导入pyquaternion模块,用于四元数计算
import yaml # 导入yaml模块,用于处理YAML配置文件
import protobuf_msgs.channel_data_pb2 as cd # 导入channel_data_pb2模块,protobuf消息定义
import protobuf_msgs.physics_update_pb2 as phyud # 导入physics_update_pb2模块,protobuf消息定义
from network_coordinator import NetworkCoordinator # 导入NetworkCoordinator模块
def run_protobuf_server(config):
# 运行protobuf服务器
if config['phy_use_uds']:
server_address = config['phy_uds_server_address']
# 确保套接字文件不存在
try:
os.unlink(server_address)
except OSError:
if os.path.exists(server_address):
raise
# 创建UNIX域套接字
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
server_address = (config['phy_ip_server_address'], config['phy_ip_server_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
sock.bind(server_address) # 绑定服务器地址
try:
prev_time = 0
print("Hit Ctrl-c to exit")
sock.listen(1) # 监听连接请求
connection, client_address = sock.accept() # 接受客户端连接
while True:
try:
num_nodes = len(config['mac_list']) # 获取节点数量
data = gzip.decompress(NetworkCoordinator.recv_one_message(connection)) # 接收并解压数据
data = gzip.compress(gen_response(parse_request(data), num_nodes)) # 生成并压缩响应数据
cur_time = time.time() # 当前时间
if cur_time - prev_time > 2: # 每隔2秒打印通道数据
prev_time = time.time()
time_update = phyud.PhysicsUpdate()
time_update.ParseFromString(gzip.decompress(data))
channel_data = cd.ChannelData()
channel_data.ParseFromString(gzip.decompress(time_update.channel_data))
print(channel_data)
NetworkCoordinator.send_one_message(connection, data) # 发送响应数据
except socket.error:
raise KeyboardInterrupt
except KeyboardInterrupt:
print("\nExiting physics simulator dummy main process")
finally:
sock.close() # 关闭套接字
if config['phy_use_uds']:
os.unlink(server_address) # 删除UNIX域套接字文件
def parse_request(req):
time_update = phyud.PhysicsUpdate()
time_update.ParseFromString(req)
return time_update # 解析请求数据
def gen_response(time_update, num_nodes):
time_update.msg_type = phyud.PhysicsUpdate.END # 设置消息类型为END
data = generate_data(num_nodes) # 生成通道数据
time_update.channel_data = gzip.compress(data) # 压缩通道数据
return time_update.SerializeToString() # 序列化响应数据
def generate_data(num_nodes):
channel_data = cd.ChannelData() # 创建 ChannelData 对象
for __ in range(num_nodes): # 根据节点数量生成随机数据
channel_data.node_list.extend([float(20 * np.random.ranf((1, 1)))]) # 生成随机浮点数并添加到节点列表中
channel_data.node_list.extend([float(20 * np.random.ranf((1, 1)))])
channel_data.node_list.extend([float(20 * np.random.ranf((1, 1)))])
channel_data.node_list.extend(Quaternion.random()) # 生成随机四元数并添加到节点列表中
los = 0
other = 0
for i in range(random.randint(1, 2)): # 随机选择节点对
path_details = channel_data.path_details.add()
a, b = random.sample((range(1, num_nodes + 1)), k=2) # 从节点集合中随机选择两个节点
path_details.ids.extend([a, b]) # 将节点对的ID添加到路径细节中
path_details.los = True # 设置路径为LOS(Line of Sight)
los += int(path_details.los)
for j in range(random.randint(0, 3)): # 随机选择路径数量
k = random.randint(1, 3) # 随机选择路径中的跳数
path_details.num_hops.append(k) # 将跳数添加到路径细节中
path_details.hop_points.extend(np.random.rand(1, (4 * k)).tolist()[0]) # 生成随机浮点数并添加到路径细节中
other += 1
return channel_data.SerializeToString() # 将 ChannelData 对象序列化为字符串并返回
def driver_process(config):
addresses = [] # 存储地址的列表
socket_list = [] # 存储套接字的列表
try:
addresses = [config['phy_driver_uds_server_address'] + str(i) for i, mac_i in enumerate(config['mac_list'])] # 根据配置信息生成地址列表
def connect(id, addr):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 创建 Unix 域套接字
sock.bind(addr) # 绑定地址
socket_list.append(sock) # 将套接字添加到列表中
sock.listen(1) # 监听连接请求
connection, _ = sock.accept() # 接受连接请求并返回连接对象和地址
print(f"Connected to {addr}")
if id == 1: time.sleep(0.5) # 如果是第二个连接,则延迟0.5秒
mac_tuple = ("1a", "2a")
src_mac, dst_mac = mac_tuple if id == 0 else mac_tuple[::-1] # 根据连接的序号选择源MAC地址和目标MAC地址
request = '''{"type": "driver_request","src_mac": "%s","dst_mac": "%s"}''' % (src_mac, dst_mac) # 创建请求消息
try:
while True:
print(f"Sent: " + (request)) # 打印发送的消息
NetworkCoordinator.send_one_message(connection, request.encode("utf-8")) # 发送消息
r, __, __ = select.select([connection, ], [], [], 0) # 检查是否有可读取的数据
if r: print(f"Received: " + (NetworkCoordinator.recv_one_message(connection)).decode("utf-8")) # 如果有数据可读取,则打印接收的消息
time.sleep(5) # 延迟5秒
except socket.error:
return
threads = [] # 存储线程的列表
id = 0
for a in addresses:
thread = threading.Thread(target=connect, args=(id, a,)) # 创建线程,并指定连接函数和参数
thread.setDaemon(True) # 设置线程为守护线程
thread.start() # 启动线程
threads.append(thread) # 将线程添加到列表中
id += 1
for t in threads: t.join() # 等待所有线程结束
finally:
print("\nExiting physics simulator dummy driver process") # 打印退出消息
for sock in socket_list: sock.close() # 关闭所有套接字
for address in addresses: os.unlink(address) # 删除所有地址文件
def main(args):
if len(args) != 2:
print("usage: phy_sim_dummy.py <config_file>") # 打印用法信息
else:
with open(args[1]) as f:
config = yaml.load(f, Loader=yaml.FullLoader) # 加载配置文件
if config['do_driver_transfer']:
ranging_process = multiprocessing.Process(target=driver_process, args=(config,)) # 创建子进程并指定驱动程序函数和参数
ranging_process.start() # 启动子进程
run_protobuf_server(config) # 运行协议缓冲区服务器功能(未提供代码)
if config['do_driver_transfer']:
ranging_process.join() # 等待子进程结束
return 0 # 返回值为0
# 程序入口
if __name__ == "__main__":
import sys
sys.exit(main(sys.argv))
评论区