Python实现CAN总线J1939报文接收、发送

一、环境搭建

1.概述

本文主要是通过Python3实现CAN总线上J1939报文接收、发送等功能,通过模拟单帧、多帧实现周期性发送报文等模拟场景。

2.CAN工具

本案例采用的是PCAN-USB工具

PCAN-USB驱动:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

3.Python安装

下载地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

 库:pip install can-j1939

二、项目演示

1.J1939报文接收

2.J1939报文发送

 用另一台CAN工具查看报文,接收正常:

三、完整代码

#!/usr/bin/python
# _*_ coding:utf-8 _*_

import time, j1939, threading


def on_message(priority, pgn, sa, timestamp, data):
    """接收来自总线的消息"""
    print(priority, pgn, sa, timestamp, [data[i] for i in range(len(data))])  # 打印消息


def recv_j1939(read_time):
    """订阅总线消息"""
    ecu.subscribe(on_message)
    time.sleep(read_time)
    ecu.unsubscribe(on_message)  # 停止订阅消息


def cycle_send_j1939():
    """发送j1939报文"""
    j1939_loog_data = [{'cycle': 1000, "pgn": 65251, "dp": 0, "pf": 254, "ps": 227, "p": 6, "sa": 0,
                        "data": [0, 25, 132, 112, 63, 137, 64, 31, 133, 128, 37, 133, 32, 53, 136, 155, 66, 255, 255,
                                 164, 3, 0, 0,60, 70, 213, 125, 133, 155, 66, 134, 0, 255, 255, 255, 255, 255, 255, 255,255]}]  # j1939长帧报文
    j1939_data = [{'cycle': 250, "can_id": 0x18FEDF00, 'pgn': 65247, "data": [130, 128, 37, 125, 251, 255, 255, 240]},
                  {'cycle': 50, "can_id": 0x18F00400, "pgn": 61444, "data": [14, 189, 130, 0, 0, 0, 0, 125]},
                  {'cycle': 100, "can_id": 0x18FE9200, "pgn": 65170,"data": [255, 255, 254, 255, 255, 255, 0, 0]}]  # j1939短帧报文
    for i in j1939_loog_data:
        i['currenttime'] = int(round(time.time() * 1000))
    for i in j1939_data:
        i['currenttime'] = int(round(time.time() * 1000))
    while True:
        currenttime = int(round(time.time() * 1000))
        for i in j1939_loog_data:
            interval = currenttime - i['currenttime']
            if interval >= i['cycle']:
                ecu.send_pgn(data_page=i["dp"], pdu_format=i["pf"], pdu_specific=i["ps"], priority=i["p"],
                             src_address=i["sa"], data=i["data"])  # 发送长帧报文
                print('PGN%d:长帧报文发送成功' % i['pgn'])
                i['currenttime'] = currenttime  # 更新发送时间
        for i in j1939_data:
            interval = currenttime - i['currenttime']
            if interval >= i['cycle']:
                ecu.send_message(can_id=i["can_id"], data=i["data"])  # 发送短帧报文
                print('PGN%d:短帧报文发送成功' % i['pgn'])
                i['currenttime'] = currenttime  # 更新发送时间


if __name__ == '__main__':
    print("初始化总线")
    ecu = j1939.ElectronicControlUnit()  # 创建ECU
    ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=500000)  # 连接CAN总线
    # s1 = threading.Thread(target=recv_j1939, name="接收J1939消息线程",args=(10,))
    # s1.start()
    s2 = threading.Thread(target=cycle_send_j1939, name="发送J1939消息线程")
    s2.start()
    # print('取消初始化')
    # ecu.disconnect()   #断开总线连接
—————————————————————————————— 选择正确的事、再把事做正确 ——————————————————————————————
原文地址:https://www.cnblogs.com/airb/p/15668992.html