run_protobuf_server(config):这个函数设置一个服务器来接收以protobuf消息形式的网络更新。它创建一个socket,将其绑定到指定的服务器地址上并监听传入连接。一旦建立连接,它接收数据,解析请求,生成响应,使用gzip进行压缩,然后将其发送回客户端。
parse_request(req):这个函数以一个请求(protobuf消息)为输入,使用NetworkUpdate协议缓冲区进行解析,并返回解析后的NetworkUpdate对象。它还调用parse_phy_message函数来解析请求中的任何信道数据。
gen_response(time_update, ip_list):这个函数以一个NetworkUpdate对象和一个IP地址列表为输入。它修改NetworkUpdate对象,将消息类型设置为"END",将目标IP地址列表中的数据包延迟500ms,并向ber和rx_ip字段附加一些数据。最后,它将更新后的NetworkUpdate对象序列化为字符串并返回。
parse_phy_message(data):这个函数以信道数据为输入,使用ChannelData协议缓冲区进行解析,并提取LOS(视野)和路径等信息。
driver_process(config):这个函数设置一个单独的服务器来处理驱动程序请求。它创建一个socket,将其绑定到指定的服务器地址上并监听传入连接。一旦建立连接,它接收驱动程序请求,处理它们,并发送驱动程序响应。
请注意,代码中缺少导入语句和一些必要的依赖项,例如NetworkCoordinator类的定义和select模块的导入。此外,代码中有一些部分是不完整的或者缺少实现(parse_phy_message函数没有返回任何内容,ip_list也没有定义)。
import gzip # 导入gzip模块,用于数据压缩
import json # 导入json模块,用于处理JSON数据
import multiprocessing # 导入multiprocessing模块,用于实现多进程
import os # 导入os模块,用于操作系统相关的功能
import socket # 导入socket模块,用于网络通信
import struct # 导入struct模块,用于处理二进制数据的打包和解包
import time # 导入time模块,用于时间相关操作
import numpy as np # 导入numpy模块,用于数值计算
import select # 导入select模块,用于实现非阻塞I/O
import yaml # 导入yaml模块,用于处理YAML数据
import protobuf_msgs.channel_data_pb2 as cd # 导入channel_data_pb2模块,用于处理protobuf消息
import protobuf_msgs.network_update_pb2 as netud # 导入network_update_pb2模块,用于处理protobuf消息
from network_coordinator import NetworkCoordinator # 导入NetworkCoordinator类
def run_protobuf_server(config):
# 如果配置中指定使用UNIX域套接字(UDS),则使用UDS作为服务器地址
if config['net_use_uds']:
server_address = config['netsim_uds_server_address']
# 确保套接字文件不存在
try:
os.unlink(server_address)
except OSError:
if os.path.exists(server_address):
raise
# 创建UNIX域套接字
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
# 使用IP地址和端口作为服务器地址
server_address = (config['netsim_ip_server_address'], config['netsim_ip_server_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1)
# 将套接字绑定到服务器地址
sock.bind(server_address)
try:
print("Hit Ctrl-c to exit")
sock.listen(1) # 监听连接请求
connection, client_address = sock.accept() # 接受客户端连接
while True:
try:
# 接收数据
data = NetworkCoordinator.recv_one_message(connection)
# 解压缩接收到的数据并解析请求
data = gzip.compress(gen_response(parse_request(gzip.decompress(data)), config['ip_list']))
# 发送响应数据
NetworkCoordinator.send_one_message(connection, data)
except socket.error:
raise KeyboardInterrupt
except KeyboardInterrupt:
print("\nExiting network simulator dummy main process")
finally:
sock.close() # 关闭套接字
if config['net_use_uds']:
os.unlink(server_address)
def parse_request(req):
# 创建NetworkUpdate对象,用于解析请求
time_update = netud.NetworkUpdate()
# 使用protobuf解析请求数据
time_update.ParseFromString(req)
if time_update.channel_data:
parse_phy_message(gzip.decompress(time_update.channel_data))
time_update.channel_data = b""
return time_update
def gen_response(time_update, ip_list):
# 设置消息类型为"END"
time_update.msg_type = netud.NetworkUpdate.END
for i, id_i in enumerate(time_update.pkt_id):
ip_dst = socket.inet_ntoa(struct.pack('!L', time_update.dst_ip[i]))
if ip_dst in ip_list:
print("Packet captured. Destination: %s. Delaying for 500ms." % ip_dst)
time.sleep(0.5) # 延迟500毫秒
time_update.ber.append(1e-9)
time_update.rx_ip.append(time_update.dst_ip[i]) # 不处理广播
del time_update.pkt_lengths[:]
return time_update.SerializeToString()
def parse_request(req):
time_update = netud.NetworkUpdate() # 创建NetworkUpdate对象
time_update.ParseFromString(req) # 从req中解析数据填充到time_update对象
if time_update.channel_data:
parse_phy_message(gzip.decompress(time_update.channel_data)) # 解压缩channel_data并调用parse_phy_message函数处理
time_update.channel_data = b"" # 清空channel_data字段
return time_update # 返回处理后的time_update对象
def gen_response(time_update, ip_list):
time_update.msg_type = netud.NetworkUpdate.END # 设置msg_type字段为END
for i, id_i in enumerate(time_update.pkt_id):
ip_dst = socket.inet_ntoa(struct.pack('!L', time_update.dst_ip[i])) # 将dst_ip转换为IP地址形式
if ip_dst in ip_list:
print("Packet captured. Destination: %s. Delaying for 500ms." % ip_dst) # 打印捕获的数据包目的地并延迟500毫秒
time.sleep(0.5) # 延迟500毫秒
time_update.ber.append(1e-9) # 将1e-9追加到ber列表
time_update.rx_ip.append(time_update.dst_ip[i]) # 将dst_ip[i]追加到rx_ip列表(不处理广播)
del time_update.pkt_lengths[:] # 清空pkt_lengths列表
return time_update.SerializeToString() # 将time_update对象序列化为字符串并返回
def parse_phy_message(data):
channel_data = cd.ChannelData() # 创建ChannelData对象
channel_data.ParseFromString(data) # 从data中解析数据填充到channel_data对象
num_nodes = int(len(channel_data.node_list) / 7) # 计算节点数量
nodes = np.array(channel_data.node_list).reshape((num_nodes, 7)) # 将node_list转换为二维数组形式
los = 0 # 可见光直射路径数量
other = 0 # 其他路径数量
if len(channel_data.path_details):
path_details = {}
for path_detail in channel_data.path_details: # 遍历每对节点的路径细节
path_id = tuple(path_detail.ids) # 将ids转换为元组形式作为路径ID
if not path_detail or not path_id:
continue
path_details[path_id] = {'los': path_detail.los} # 将路径ID和los存储到path_details字典中
los += int(path_detail.los) # 增加los计数
hop_points_index = 0
for num_hops_in_path in path_detail.num_hops: # 遍历给定节点的每条路径
points_in_path = path_detail.hop_points[hop_points_index: hop_points_index + (num_hops_in_path * 4)] # 获取路径中的点
path_details[path_id].setdefault(num_hops_in_path, []).extend(points_in_path) # 将路径中的点存储到path_details字典中
hop_points_index += (num_hops_in_path * 4)
other += 1
else:
path_details = None # 如果没有路径细节,则将path_details设置为None
def driver_process(config):
if config['net_use_uds']:
server_address = config['net_driver_uds_server_address']
try:
os.unlink(server_address)
except OSError:
if os.path.exists(server_address):
raise
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) # 创建UNIX域套接字
else:
server_address = (config['netsim_ip_server_address'], config['netsim_ip_ranging_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 创建IPv4套接字
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 设置TCP_NODELAY选项为1
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_QUICKACK, 1) # 设置TCP_QUICKACK选项为1
try:
sock.bind(server_address) # 绑定套接字到服务器地址
sock.listen(1) # 监听连接请求
driver_connection, _ = sock.accept() # 接受客户端连接请求
while True:
r, __, __ = select.select((driver_connection,), [], [], 2) # 使用select函数等待套接字可读
if not r:
continue
try:
driver_requests = NetworkCoordinator.recv_one_message(driver_connection) # 从driver_connection接收消息
if not driver_requests:
break
driver_requests = json.loads(gzip.decompress(driver_requests)) # 解压缩并解析driver_requests
driver_responses = []
for request in driver_requests:
if request.get("type", None) == "driver_request":
src_mac = request.get("src_mac", None)
dst_mac = request.get("dst_mac", None) if src_mac else None
if dst_mac:
request["type"] = "driver_reply"
driver_responses.append(request)
if driver_responses:
NetworkCoordinator.send_one_message(driver_connection,
gzip.compress(json.dumps(driver_responses).encode("utf-8"))) # 压缩并发送driver_responses
except socket.error:
return
finally:
print("\nExiting network simulator dummy driver process")
sock.close()
if config['net_use_uds']:
os.unlink(server_address) # 删除UNIX域套接字文件
def main(args):
if len(args) != 2:
print("usage: net_sim_dummy.py <config_file>") # 打印用法信息
else:
with open(args[1]) as f:
config = yaml.load(f, Loader=yaml.FullLoader) # 加载配置文件数据
if config['do_driver_transfer']:
ranging_process = multiprocessing.Process(target=driver_process, args=(config,))
ranging_process.start() # 启动进程
run_protobuf_server(config) # 运行协议缓冲区服务器
if config['do_driver_transfer']:
ranging_process.join() # 等待进程结束
return 0
if __name__ == '__main__':
import sys
sys.exit(main(sys.argv))
评论区