-
Notifications
You must be signed in to change notification settings - Fork 6
Open
Description
@dougpopeney wrote the below code to get more features working (thanks Doug!). We should merge it into the main branch, find nicer abstractions than hardcoded hex strings, and test all functions on a Watlow.
"""Drivers for Watlow EZ-Zone temperature controllers."""
import struct
from binascii import unhexlify
import re
import crcmod
import serial
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
# BACnet CRC: https://sourceforge.net/p/bacnet/mailman/message/1259086/
# CRC8 polynominal: X^8 + X^7 + 1 (110000001)
crc8 = crcmod.mkCrcFun(0b110000001)
# CRC16 polynominal: X^16 + X^12 + X^5 + 1 (10001000000100001)
crc16 = crcmod.mkCrcFun(0b10001000000100001)
def f_to_c(f):
"""Convert Fahrenheit to Celsius."""
return (f - 32.0) / 1.8
def c_to_f(c):
"""Convert Celsius to Fahrenheit."""
return c * 1.8 + 32.0
class TemperatureController(object):
"""Driver for the Watlow EZ-ZONE temperature controller.
This driver borrows heavily from this StackOverflow post:
https://reverseengineering.stackexchange.com/questions/8303/
rs-485-checksum-reverse-engineering-watlow-ez-zone-pm
The EZ-Zone communication protocol is Bacnet MS/TP over a serial line.
There are libraries for this protocol, namely bacpypes, but they don't
support serial devices. As we only need three commands, we're going to
manually build the appropriate request strings.
The request breakdown is:
Preamble Req/Res Zone ??? Check ??? Register Instance Value Check
55ff 05 10 000006 e8 010301 0401 01 00000000 e399
* Preamble is always 55ff for BACNET MS/TP.
* Req/Res is a guess. It looks like all requests are 05 and responses are 06.
* Zone, only 10 works. Maybe other zones are for splitting RS-485 out into
a many-to-one configuration.
* Nothings don't seem to change between valid requests.
* First checksum is a custom protocol.
* Only known registers are 0401 for PV and 0701 for SP. Other registers
return data, so we could hunt around for PID params if needed.
* Instance, only 01 works. Current understanding is similar to zone.
* Second checksum is a custom CRC-16 following Bacnet spec.
"""
commands = {
'actual': {'header': unhexlify('0510000006'),
'body': unhexlify('010301040101'),
'type': 'temp'},
'setpoint': {'header': unhexlify('0510000006'),
'body': unhexlify('010301070101'),
'type': 'temp'},
'mode': {'header': unhexlify('0510000006'),
'body': unhexlify('010301080101'),
'type': 'int'},
'min_set': {'header': unhexlify('0510000006'),
'body': unhexlify('010301070301'),
'type': 'temp'},
'max_set': {'header': unhexlify('0510000006'),
'body': unhexlify('010301070401'),
'type': 'temp'},
'heat_power': {'header': unhexlify('0510000006'),
'body': unhexlify('010301081101'),
'type': 'percent'},
'heat_prop_band': {'header': unhexlify('0510000006'),
'body': unhexlify('010301080901'),
'type': 'temp'},
'set_setpoint': {'header': unhexlify('051000000a'),
'body': unhexlify('010407010108'),
'type': 'temp'},
'set_min_set': {'header': unhexlify('051000000a'),
'body': unhexlify('010407030108'),
'type': 'temp'},
'set_mode': {'header': unhexlify('0510030009'),
'body': unhexlify('01140801010f0100'),
'type': 'int'}
}
responses = {
'actual': re.compile('^55ff060010000b8802030104010108([0-9a-f]{8})([0-9a-f]{4})$'),
'setpoint': re.compile('^55ff060010000b8802030107010108([0-9a-f]{8})([0-9a-f]{4})$'),
'mode': re.compile('^55ff0600100009770203010801010701([0-9a-f]{2})([0-9a-f]{4})$'),
'min_set': re.compile('^55ff060010000b8802030107030108([0-9a-f]{8})([0-9a-f]{4})$'),
'max_set': re.compile('^55ff060010000b8802030107040108([0-9a-f]{8})([0-9a-f]{4})$'),
'heat_power': re.compile('^55ff060010000b8802030108110108([0-9a-f]{8})([0-9a-f]{4})$'),
'heat_prop_band': re.compile('^55ff060010000b8802030108090108([0-9a-f]{8})([0-9a-f]{4})$'),
'set_setpoint': re.compile('^55ff060010000a76020407010108([0-9a-f]{8})([0-9a-f]{4})$'),
'set_min_set': re.compile('^55ff060010000a76020407030108([0-9a-f]{8})([0-9a-f]{4})$'),
'set_mode': re.compile('^55ff06031000081102140801010701([0-9a-f]{2})([0-9a-f]{4})$')
}
def __init__(self, port, timeout=0.5):
"""Open up a serial connection to the controller.
This device uses RS-422 instead of RS-232. You will likely need a
custom converter.
"""
self.port = port
self.baudrate = 38400
self.timeout = timeout
self.connection = None
self.open()
def open(self):
"""Open up a serial connection to the oven."""
self.connection = serial.Serial(
self.port,
self.baudrate,
timeout=self.timeout
)
def close(self):
"""Close the serial connection. Use on cleanup."""
self.connection.flush()
self.connection.close()
def get(self):
"""Get the current temperature and setpoint, in C."""
# output = {'actual': None, 'setpoint': None}
# output = {'setpoint': None}
# output = {'mode': None}
output = {'actual': None, 'setpoint': None, 'mode': None, 'min_set': None,'max_set': None,
'heat_power': None, 'heat_prop_band': None}
for key in output:
# Calculate header and data checksums based on BACnet CRC
header_checksum = struct.pack('<H', ~crc8(self.commands[key]['header']) & 0xff)
data_checksum = struct.pack('<H', ~crc16(self.commands[key]['body']) & 0xffff)
# send command to controller, formatting preamble, heater, crc8, body and crc16
output[key] = self._write_and_read(
request=unhexlify('55ff')+self.commands[key]['header']+header_checksum[:1]+self.commands[key]['body']+data_checksum,
length=21,
check=self.responses[key],
type=self.commands[key]['type']
)
return output
def set(self, parameter, setpoint):
# generate body for temperature
if self.commands[parameter]['type'] == 'temp':
"""Set the setpoint temperature, in C."""
body = self.commands[parameter]['body'] + struct.pack('>f', c_to_f(setpoint))
# generate body for integer
if self.commands[parameter]['type'] == 'int':
body = self.commands[parameter]['body'] + setpoint.to_bytes(1,'big')
# Calculate header and data checksums based on BACnet CRC
header_checksum = struct.pack('<H', ~crc8(self.commands[parameter]['header']) & 0xff)
data_checksum = struct.pack('<H', ~crc16(body) & 0xffff)
# send command to controller, formatting preamble, heater, crc8, body and crc16
response = self._write_and_read(
request=unhexlify('55ff')+self.commands[parameter]['header']+header_checksum[:1]+body+data_checksum,
length=20,
check=self.responses[parameter],
type=self.commands[parameter]['type']
)
# check setpoint versus response, if not the same raise an error
if round(setpoint, 2) != round(response, 2):
raise IOError(f"Could not change parameter from "
f"{response:.2f} to {setpoint:.2f}...")
def _write_and_read(self, request, length, check, type, retries=3):
"""Write to and read from the device.
This function abstracts a whole lot of validation checks and error
handling. The goal is for this driver to be stable to both incomplete
messages and temporary disconnects.
The regex parses out the response checksum but does not use it. A
description of how to calculate it is in the following manual:
http://www.bacnet.org/Addenda/Add-135-2010an-APR1-1_chair-approved.pdf
However, my attempts at reproducing did not go well.
"""
if not self.connection.is_open:
self.open()
if retries <= 0:
self.close()
raise IOError("Could not communicate with Watlow.")
self.connection.flush()
try:
# print('Formated Request: ' +str(bytes.hex(request)))
self.connection.write(request)
response = self.connection.read(length)
except serial.serialutil.SerialException:
return self._write_and_read(request, length, check, retries - 1)
match = check.match(bytes.hex(response))
#print('Formated Response: ' + str(bytes.hex(response)))
if not match:
return self._write_and_read(request, length, check, retries - 1)
value = match.group(1) # From docstring, `checksum = match.group(2)` could be added and checked.
# format response if temperature or integer
if type == 'temp': value = f_to_c(struct.unpack('>f', unhexlify(value))[0])
elif type == 'int': value = int(value, 16)
else: value = struct.unpack('>f', unhexlify(value))[0]
return valueOriginally posted by @patrickfuller in #19 (comment)
Metadata
Metadata
Assignees
Labels
No labels