0
点赞
收藏
分享

微信扫一扫

Python读写欧姆龙PLC(本工作是由具体项目而开展的)

卿卿如梦 2022-04-06 阅读 36

这里写自定义目录标题

硬件环境介绍

plc(Omron cj2m)、加湿器、加热器、传感器、阀、窗、风机…等等。数据链接

目标

在已有的下位机基础(plc控制系统)上搭建可进行算法运行控制的智能控制系统。

方式

欧姆龙plc所支持的通信方式有 欧姆龙通信

若采用python直接对Omron plc进行通讯则流程较为复杂,开发难度较大,故本系统采用opc通讯作为中间协调 Python——opc——plc的方式实现通讯。OPC服务器通过硬件驱动获取设备的数据,上位监控再获取OPC服务器的数据,数据获取的方式可以是第三方成熟的OPCClient,也可以基于python开发来读/写数据。此方式也将会借助OpenOPC这样的python插件。
OPC是工业控制和生产自动化领域中使用的硬件和软件的接口标准,以便有效地在应用和过程控制设备之间读写数据。O代表OLE(对象链接和嵌入),P (process过程),C (control控制)。
OPC工业控制
OPC实现是通过软件KEPServerEX是第三方的OPC服务器,各不同厂家多种设备下位PLC与上位机之间通讯
系统整体结构

##步骤

KEPServerEX6搭建
第一步:在这里插入图片描述

驱动程序选择Omron Fins Ethernet

第二步
在这里插入图片描述
该处选择对应的plc型号
填写对应的plc地址

第三步
在这里插入图片描述
根据plc程序中的地址而建立需要使用到的标记。

DCOM配置

在这里需要针对客户端(上位机)和服务端(OPC服务器)都要进行DCOM的配置,DCOM的配置资料网上较多,可以自行参考

运行环境

链接: link.
1.安装Python:python3各版本均可,但必须是32位但必须是32位但必须是32位。
2.安装KepServer:各版本均可。这里使用的是Kepware.KEPServerEX.V6,下载:
尝试KEPServerEXV6.4.rar_python读写kepware数据
首先在KepServer中新建Channel1 -> Device1,建立三个变量,连接plc后,在QC下查看数据是否能够正常读取。数据质量显示为good则表示读取正常。
3.注册opcdaauto
将opcdaauto.dll文件置于C:\Windows\System32目录下(使用64位环境时,置于SysWOW64目录下),cmd执行命令:regsvr32 C:\Windows\System32\opcdaauto.dll

python读取代码

读取Omron plc中的数据

import logging
import re
import time
from typing import List

import win32com.client
from win32com.client import gencache

class GroupProperty:
# DeadBand
IsActive: bool = True
IsSubscribed: bool = True
UpdateRate: int = 1000
DefaultGroupIsActive: bool = True
DefaultGroupDeadband: int = 0

class readomron(GroupProperty):
def init(self):
logging.basicConfig(level=‘DEBUG’)
self.logger = logging.getLogger(‘dll_dispatch’)

    # 加载dll对象
    OPC_DA_DLL = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0)
    self.opcServer = OPC_DA_DLL.OPCServer()  # 获取opcServer对象
    self.group_name_map = {}  # 存放已添加的组名
    self.item_name_map = {}
    self.Groups = None
    self.fi = open(r'.\\src\\txt', 'r')

def read(self,):
    self.opcServer = self.connect_opc('Kepware.KEPServerEX.V6', '127.0.0.1')
    # print(get_servers(opcServer))
    # 定义点位列表

    txt = self.fi.readlines()
    tag_list = []

    for w in txt:
        w = w.replace('\n', '')
        tag_list.append(w)

    try:
        while True:
            data = self.sync_read_items(self.opcServer, tag_list)
            for i in data:
                print(i[0])
            time.sleep(2)
    except Exception:
        self.logger.error('异常退出', exc_info=True)
    finally:
        self.opcServer.Disconnect()

    return tag_list

def connect_opc(self,progID: str, node: str = '192.168.250.1'):
    self.opcServer.Connect(progID, node)
    self.logger.info('已连接到opc - [{}:{}]'.format(node, progID))
    return self.opcServer

def disconnect_opc(self,opcServer):
    self.logger.info('opc断开连接')
    opcServer.Disconnect()

def get_groups(self,opcServer, groupProperty: GroupProperty):

    if not self.Groups:
        Groups = opcServer.OPCGroups
        Groups.DefaultGroupIsActive = groupProperty.IsActive
        Groups.DefaultGroupDeadband = groupProperty.DefaultGroupDeadband
        Groups.DefaultGroupUpdateRate = groupProperty.UpdateRate
    return Groups

def get_group(self,opcServer, opcGroupName: str, groupProperty: GroupProperty):
    opcGroups = self.get_groups(opcServer, GroupProperty())
    if opcGroupName not in self.group_name_map:
        opcGroup = opcGroups.Add(opcGroupName)
        opcGroup.IsActive = groupProperty.IsActive
        opcGroup.UpdateRate = groupProperty.UpdateRate
        opcGroup.IsSubscribed = groupProperty.IsSubscribed

        self.group_name_map[opcGroupName] = opcGroup
    else:
        self.logger.debug('重复添加 GroupName - {}'.format(opcGroupName))

    return self.group_name_map[opcGroupName]

def get_items(self,opcServer, opcGroupName: str, groupProperty: GroupProperty):
    group = self.get_group(opcServer, opcGroupName, groupProperty)
    return group.OPCItems

def add_item(self,opcServer, itemName: str):
    """添加一个点位"""
    if itemName not in self.item_name_map:
        try:
            groupName = re.split(r'[.$][^.$]*?$', itemName, 1)[0]
        except IndexError:
            self.logger.error('点位名-{}-不符合规则'.format(itemName))
            return
        items = self.get_items(opcServer, groupName, GroupProperty())
        try:
            item = items.AddItem(itemName, len(self.item_name_map) + 1)
            # 添加失败时 item 为 None
            if not items:
                raise RuntimeError('add item[{}] return None'.format(itemName))
        except Exception as e:
            self.logger.error('添加点位-{}-失败! [点位可能不存在]'.format(itemName), exc_info=True)
            self.item_name_map[itemName] = None
        else:
            item.IsActive = True
            time.sleep(0.1)
            self.item_name_map[itemName] = item
    else:
        self.logger.debug('重复添加 itemName - {}'.format(itemName))

def add_items(self,opcServer, item_list: List[str]):
    """批量添加点位"""
    for item in item_list:
        self.add_item(opcServer, item)

def _sync_read(self,item):
    data = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0)
    # print('......1........')
    if not data:
        raise ValueError('sync_read return None.')
    return data
    item.Write(True)

def sync_read_item(self,opcServer, itemName: str):
    """
    同步读取一个 opc 点位
    :param opcServer:
    :param itemName:
    :return:  (数据, 数据质量, 时间)
    """
    if itemName not in self.item_name_map:
        self.add_item(opcServer, itemName)
    self.logger.debug('读取 - {}'.format(itemName))
    item = self.item_name_map[itemName]
    try:
        if not item:
            raise ValueError('点位不存')
        return self._sync_read(item)
    except Exception as e:
        self.logger.error('采集失败: {} - {}'.format(itemName, e), exc_info=True)
        return None, 0, 0

def sync_read_items(self,opcServer, itemNameList: List[str]):
    result = []
    for itemName in itemNameList:
        if itemName not in self.item_name_map:
            self.add_item(opcServer, itemName)

        result.append(self.sync_read_item(opcServer, itemName))
    return result

``Omron写入代码
import logging
import re
import time
from typing import List

import win32com.client
from win32com.client import gencache
class writeomron():

def __init__(self):
    dll = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0)
    self.opcserver = dll.OPCServer()
    # for svr in opcserver.GetOPCServers():
    #     print (svr)
    self.opcserver.Connect('KEPware.KEPServerEx.V6')

    groups = self.opcserver.OPCGroups
    groups.DefaultGroupIsActive = True
    groups.DefaultGroupDeadband = 0
    groups.DefaultGroupUpdateRate = 200
    group = groups.Add('Channel1.Device1')

    # group = DispatchWithEvents(groups.Add('Group1'),Group1Event)
    group.IsActive = True
    group.IsSubscribed = True
    group.UpdateRate = 100

    self.items = group.OPCItems

    self.target_dict = {'Channel1.Device1.OutCyclic': False,
               
                   }


def write(self,widget):
    widget_dict={'btn_inner_2':'Channel1.Device1.withinCyclic',
            }

    if widget_dict[widget]:
        target=widget_dict[widget]
        print('1')
        print('target',target)
        item = self.items.AddItem(target, 0)
        da = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0)
        if self.target_dict[target]:
            item.Write(False)
            self.target_dict[target]=False
            print('2')
        else:
            item.Write(True)
            self.target_dict[target] = True
            print('3')

参考博客/文章

举报

相关推荐

0 条评论