飞道的博客

使用边缘计算网关分析CAN报文

316人阅读  评论(0)


介绍一个使用边缘网关分析CAN报文,并将其进行数据可视化的应用案例。

硬件介绍

在此应用案例中,使用的传感器是一个加速度传感器,可以输出x、y、z轴三周的加速度。 传感器的通信协议是CAN Open。
使用的网关是虹科Dynagate 10-12

传感器信息收集

首先,使用网关Dynagate 10-12 收集CAN报文。在此之前,需要设置好网关的CAN接口。这里开放我们的can0接口,并且设置好波特率,与我们的传感器匹配。本次案例的传感器波特率是 1000Mpbs

ip link set can0 down
ip link set can0 type can bitrate 1000000
ip link set can0 up

设置好之后,使用命令获取can报文

candump can0

就会得到原始的 raw_data

首先是CAN id, 其次是dlc, 最后是CAN报文内容。

使用python处理CAN 报文

有了 raw data, 还需要有DBC文件对CAN报文进行解析。我们这里使用python 导入dbc文件,并且进行解析

import datetime
#  导入两个python处理can总线的包
import can
import cantools

#  dbc文件路径
DBC_DIR = "/home/hkaco/demo.dbc"

#  加载dbc文件 并创建can总线实例
db = cantools.database.load_file(DBC_DIR)
can_bus = can.interface.Bus('can0', bustype = 'socketcan')

def main():
	while (1):
		#  循环获取can总线的数据
		msg = can_bus.recv()
		#  获取消息并解码
		recv = db.decode_message(msg.arbitration_id, msg.data)
		timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
		print(timestamp)
		print(recv)
		

if __name__ == "__main__":
	main()

通过我们的解析,得到的结果如下图所示:

可以看到,我们成功的使用dbc文件把获取到的raw data解析成了我们能够理解的CAN报文。

将获取到的CAN数据通过UDP协议发送到PC端实现数据可视化

由于边缘网关通常是不带显示设备的,所以我们如果想要直观的看到这个数据产生的结果,可以通过udp协议将网关解析的结果发送到pc端,然后pc端使用matplotlib等数据处理、数据可视化库即可完成。

那么首先是在网关中写好发送的脚本,我们只需要在刚刚的脚本中,添加udp发送的相关代码即可。

import socket
import json
import time
#  导入两个python处理can总线的包
import can
import cantools

#  dbc文件路径
DBC_DIR = "/home/hkaco/demo.dbc"

#  目标ip地址和目标端口,ip地址
DEST_IP = "192.168.189.201"  
DEST_PORT = 6789

#  定义一个通过udp协议发送函数
def send_msg(udp_socket, message):
	udp_socket.sendto(message, (DEST_IP, DEST_PORT))

#  加载dbc文件 并创建can总线实例
db = cantools.database.load_file(DBC_DIR)
can_bus = can.interface.Bus('can0', bustype = 'socketcan')

def main():
	
	udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
	while (1):
		#  循环获取can总线的数据
		msg = can_bus.recv()
		#  获取消息并解码
		recv = db.decode_message(msg.arbitration_id, msg.data)
		#  专为字典格式
		recv = json.dumps(recv)
		send_msg(udp_socket, bytes(recv, encoding="utf-8"))
		time.sleep(0.5)
		
	udp_socket.close()
		

if __name__ == "__main__":
	main()

发送端写好了之后,我们在PC端写好还要写好接收端代码,并完善数据可视化代码

import socket
import matplotlib.pyplot as plt
import time
import datetime
import json
from datetime import datetime

def recv_msg(udp_socket):
    msg = udp_socket.recvfrom(1024)
    # 解码
    recv_ip = msg[1]
    recv_message = msg[0].decode('utf-8')
    recv_dict = json.loads(recv_message)

    #  recv_dict是字典,可以使用键取值

    x = recv_dict['Acc_X']
    y = recv_dict['Acc_Y']
    z = recv_dict['Acc_Z']
    t = recv_dict['T']
    return x, y, z, t

def main():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    udp_socket.bind(('', 6789))

    
    font1 = {
   'family' : 'Times New Roman',  
    'weight' : 'normal',  
    'size'   : 9,  
    } 
    plt.rcParams['font.sans-serif']=['SimHei']
    
    plt.rcParams['axes.unicode_minus']=False

    plt.ion()

    timel = []
    xl = []
    yl = []
    zl = []
    tl = []

    while True:
        nowtime = datetime.utcnow().strftime("%H:%M:%S")
        x, y, z, t = recv_msg(udp_socket)
        xl.append(x)
        yl.append(y)
        zl.append(z)
        tl.append(t)
        timel.append(nowtime)

        plt.clf()
        plt.subplots_adjust(hspace=0.5)

        ax1 = plt.subplot(211)
        ax1.plot(timel, xl, c="b", label='x')
        ax1.plot(timel, yl, c="r", label='y')
        ax1.plot(timel, zl, c= "y", label='z')
        plt.legend(loc='upper right', prop=font1, frameon=False)
        plt.xticks(rotation = 270)
        plt.xlabel('时间戳')
        plt.ylabel('加速度值')

        ax2 = plt.subplot(212, sharex = ax1)
        ax2.plot(timel, tl, c="c", label='T')
        plt.xticks(rotation = 270)
        plt.xlabel('时间戳')
        plt.ylabel('T')
        plt.legend(loc='upper right', prop=font1, frameon=False)

        plt.pause(0.001)
        plt.ioff()

    socket.close()

if __name__ == "__main__":
    main()

最终结果

首先运行接收端,然后在我们的边缘网关Dynagate 10-12中运行发送端。结果如图所示:



转载:https://blog.csdn.net/m0_49762095/article/details/112668837
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场