Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能

一、环境搭建

1.概述

本文主要是通过Python3与CAN总线工具PCAN-USB基于官方PCAN-Basic库实现CAN总线消息读写功能。

2.PCANBasic.py和PCANBasic.dll下载地址

https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

3.Python安装

下载地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

二、项目展示

1.文件目录

2.xmt文件内容

3.CAN消息读取截图

4.CAN消息发送截图

三、完整代码

#!/usr/bin/python
# _*_ coding:utf-8 _*_
from PCANBasic import *
from queue import *
import threading, time, os


class PcanOperate(PCANBasic, threading.Thread):
    def __init__(self):
        super().__init__()  # 继承父类的init方法
        result = self.Initialize(PCAN_USBBUS1, PCAN_BAUD_500K)  # 总线初始化
        if result != PCAN_ERROR_OK:
            print(self.GetErrorText(result))  # 发生错误,获取错误信息
        else:
            print("PCAN-USB 已初始化")

    def ProcessMessage(self, msg, timestamp):
        """CAN消息处理方法"""
        msg_dict = {}
        msg_id = hex(msg.ID)
        if len(msg_id[2:]) == 7:
            msg_dict["CANID"] = '0' + msg_id[2:].upper()
        else:
            msg_dict["CANID"] = msg_id[2:].upper()
        msg_dict["MSGTYPE"] = msg.MSGTYPE
        msg_dict["LEN"] = msg.LEN
        data = ''
        for i in range(8):
            if len(hex(msg.DATA[i])[2:]) == 1:
                d = ' ' + '0' + hex(msg.DATA[i])[2:].upper()
            else:
                d = ' ' + hex(msg.DATA[i])[2:].upper()
            data += d
        msg_dict["DATA"] = data[1:]

        timestamp_dict = {}
        timestamp_dict['millis'] = timestamp.millis
        timestamp_dict['millis_overflow'] = timestamp.millis_overflow
        timestamp_dict['micros'] = timestamp.micros
        time_sum = timestamp.micros + 1000 * timestamp.millis + 0x100000000 * 1000 * timestamp.millis_overflow
        return msg_dict

    def PutQueue(self):
        """从总线中读取CAN消息及其时间戳,并放入队列。"""
        while True:
            """检查接收队列是否有新消息"""
            readResult = self.GetStatus(PCAN_USBBUS1)
            if readResult != PCAN_ERROR_OK:
                result = self.GetErrorText(readResult)
                print(result[1])
                event2.set()
            else:
                readResult = self.Read(PCAN_USBBUS1)  # 接收总线报文
                time.sleep(0.01)
                if readResult[0] != PCAN_ERROR_QRCVEMPTY:
                    print("消息入队")
                    q.put(self.ProcessMessage(readResult[1], readResult[2]))  # 消息入队
                    event1.set()
                else:
                    result = self.GetErrorText(readResult[0])
                    print(result[1])
                    event2.set()
                    break

    def GetQueue(self):
        """从队列中获取信息"""
        while True:
            if event2.is_set():
                break
            if not q.empty():
                event1.wait()
                print(q.get())  # 消息出队

    def GetXmtMsg(self):
        """获取xmt文件中需要发送的消息"""
        with open(os.path.join(os.getcwd(), 'zuidazhi.xmt'), 'r+') as f:
            fo = f.readlines()
        allcandata = []
        for i, j in enumerate(fo, start=1):
            onecandata = {}
            if i > 14:
                f1 = j.strip().split()
                if len(f1) == 14:  # 获取长度为8的data
                    onecandata['cycle'] = (int(f1[3]))
                    msg = TPCANMsg()
                    msg.ID = int(f1[1][:-1], 16)
                    msg.MSGTYPE = PCAN_MESSAGE_EXTENDED
                    msg.LEN = int(f1[4])
                    msg.DATA[0] = int(f1[6][:-1], 16)
                    msg.DATA[1] = int(f1[7][:-1], 16)
                    msg.DATA[2] = int(f1[8][:-1], 16)
                    msg.DATA[3] = int(f1[9][:-1], 16)
                    msg.DATA[4] = int(f1[10][:-1], 16)
                    msg.DATA[5] = int(f1[11][:-1], 16)
                    msg.DATA[6] = int(f1[12][:-1], 16)
                    msg.DATA[7] = int(f1[13][:-1], 16)
                    onecandata['msg'] = msg
                    allcandata.append(onecandata)
        return allcandata

    def CanWrite(self):
        """发送CAN消息"""
        allcandata = self.GetXmtMsg()
        print(allcandata)
        if len(allcandata) > 0:
            for i in allcandata:
                result = self.Write(PCAN_USBBUS1, i['msg'])
                currenttime = int(round(time.time() * 1000))
                i['currenttime'] = currenttime
                if result != PCAN_ERROR_OK:
                    result = self.GetErrorText(result)
                    print(result)
                else:
                    print("时间戳记录成功")
        time.sleep(1)
        while True:
            currenttime = int(round(time.time() * 1000))
            for i in allcandata:
                interval = currenttime - i['currenttime']
                if interval >= i['cycle']:
                    result = self.Write(PCAN_USBBUS1, i['msg'])  #
                    if result != PCAN_ERROR_OK:
                        result = self.GetErrorText(result)
                        print(result)
                    else:
                        print("消息发送成功")
                        i['currenttime'] = currenttime

    def __del__(self):
        result = self.Uninitialize(PCAN_USBBUS1)  # 总线释放
        if result != PCAN_ERROR_OK:
            result = self.GetErrorText(result)
            print(result[1])
        else:
            print("PCAN-USB 已释放")


if __name__ == '__main__':
    q = Queue()
    event1 = threading.Event()
    event2 = threading.Event()
    pcan = PcanOperate()
    # s1 = threading.Thread(target=pcan.CanWrite, name="发送CAN消息线程")
    # s1.start()
    s2 = threading.Thread(target=pcan.PutQueue, name="读取CAN消息并放入队列线程")
    s2.start()
    s3 = threading.Thread(target=pcan.GetQueue, name="从队列中读取CAN消息线程")
    s3.start()
—————————————————————————————— 选择正确的事、再把事做正确 ——————————————————————————————
原文地址:https://www.cnblogs.com/airb/p/15540661.html