基于modbus协议的串口通信代码
时间: 2023-08-02 15:36:34 浏览: 110
以下是一个基于Modbus协议的串口通信代码示例:
```python
import serial
# Modbus协议相关常量
MODBUS_READ_COILS = 1
MODBUS_READ_DISCRETE_INPUTS = 2
MODBUS_READ_HOLDING_REGISTERS = 3
MODBUS_READ_INPUT_REGISTERS = 4
MODBUS_WRITE_SINGLE_COIL = 5
MODBUS_WRITE_SINGLE_REGISTER = 6
MODBUS_WRITE_MULTIPLE_COILS = 15
MODBUS_WRITE_MULTIPLE_REGISTERS = 16
# 串口通信相关常量
BAUDRATE = 9600
BYTESIZE = 8
PARITY = 'N'
STOPBITS = 1
TIMEOUT = 1
class ModbusSerial(object):
def __init__(self, port):
self.ser = serial.Serial(port=port,
baudrate=BAUDRATE,
bytesize=BYTESIZE,
parity=PARITY,
stopbits=STOPBITS,
timeout=TIMEOUT)
def read_coils(self, address, count):
request = self._build_request(MODBUS_READ_COILS, address, count)
response = self._send_request(request)
return self._parse_response(response)
def read_discrete_inputs(self, address, count):
request = self._build_request(MODBUS_READ_DISCRETE_INPUTS, address, count)
response = self._send_request(request)
return self._parse_response(response)
def read_holding_registers(self, address, count):
request = self._build_request(MODBUS_READ_HOLDING_REGISTERS, address, count)
response = self._send_request(request)
return self._parse_response(response)
def read_input_registers(self, address, count):
request = self._build_request(MODBUS_READ_INPUT_REGISTERS, address, count)
response = self._send_request(request)
return self._parse_response(response)
def write_single_coil(self, address, value):
request = self._build_request(MODBUS_WRITE_SINGLE_COIL, address, value)
response = self._send_request(request)
return self._parse_response(response)
def write_single_register(self, address, value):
request = self._build_request(MODBUS_WRITE_SINGLE_REGISTER, address, value)
response = self._send_request(request)
return self._parse_response(response)
def write_multiple_coils(self, address, values):
request = self._build_request(MODBUS_WRITE_MULTIPLE_COILS, address, values)
response = self._send_request(request)
return self._parse_response(response)
def write_multiple_registers(self, address, values):
request = self._build_request(MODBUS_WRITE_MULTIPLE_REGISTERS, address, values)
response = self._send_request(request)
return self._parse_response(response)
def _build_request(self, function_code, address, data):
# Modbus协议请求报文格式:[设备地址][功能码][数据]
if function_code in (MODBUS_READ_COILS, MODBUS_READ_DISCRETE_INPUTS, MODBUS_READ_HOLDING_REGISTERS, MODBUS_READ_INPUT_REGISTERS):
# 读请求数据格式:[起始地址高位][起始地址低位][数据长度高位][数据长度低位]
request = bytearray([0x01, function_code, (address >> 8) & 0xff, address & 0xff, (data >> 8) & 0xff, data & 0xff])
elif function_code in (MODBUS_WRITE_SINGLE_COIL, MODBUS_WRITE_SINGLE_REGISTER):
# 单个写请求数据格式:[起始地址高位][起始地址低位][数据高位][数据低位]
request = bytearray([0x01, function_code, (address >> 8) & 0xff, address & 0xff, (data >> 8) & 0xff, data & 0xff])
elif function_code in (MODBUS_WRITE_MULTIPLE_COILS, MODBUS_WRITE_MULTIPLE_REGISTERS):
# 多个写请求数据格式:[起始地址高位][起始地址低位][数据长度高位][数据长度低位][字节数][数据1][数据2]...
num_bytes = (len(data) + 7) // 8
byte_string = ''.join(['1' if x else '0' for x in data])
byte_array = [int(byte_string[i:i+8], 2) for i in range(0, len(byte_string), 8)]
request = bytearray([0x01, function_code, (address >> 8) & 0xff, address & 0xff, (len(data) >> 8) & 0xff, len(data) & 0xff, num_bytes] + byte_array)
else:
raise ValueError('Invalid Modbus function code: {}'.format(function_code))
return request
def _send_request(self, request):
self.ser.write(request)
response = self.ser.read(1024)
if not response:
raise IOError('No response received from device')
return response
def _parse_response(self, response):
# Modbus协议响应报文格式:[设备地址][功能码][数据长度][数据]
if response[0] != 0x01:
raise ValueError('Invalid Modbus response: {}'.format(response.hex()))
if response[1] & 0x80:
error_code = response[2]
raise IOError('Modbus error code: {}'.format(error_code))
data_length = response[2]
data = response[3:3+data_length]
return data
```
使用示例:
```python
modbus = ModbusSerial('/dev/ttyUSB0')
coils = modbus.read_coils(0x0000, 10)
print(coils)
```
在上面的示例中,我们创建了一个`ModbusSerial`对象,然后调用了`read_coils`方法读取了设备0x01的0x0000地址开始的10个线圈状态。注意,这里的`/dev/ttyUSB0`是Linux下的串口设备地址,Windows下可能会有所不同。
相关推荐
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)
![](https://csdnimg.cn/download_wenku/file_type_ask_c1.png)