drivers/onewire: Move onewire.py, ds18x20.py from esp8266 to drivers.

These drivers can now be used by any port (so long as that port has the
_onewire driver from extmod/modonewire.c).

These drivers replace the existing 1-wire and DS18X20 drivers in the
drivers/onewire directory.  The existing ones were pyboard-specific and
not very efficient nor minimal (although the 1-wire driver was written in
pure Python it only worked at large enough CPU frequency).

This commit brings backwards incompatible API changes to the existing
1-wire drivers.  User code should be converted to use the new drivers, or
check out the old version of the code and keep a local copy (it should
continue to work unchanged).
simmel
Damien George 5 years ago
parent b19138e82e
commit a065d78675
  1. 148
      drivers/onewire/ds18x20.py
  2. 389
      drivers/onewire/onewire.py
  3. 51
      esp8266/modules/ds18x20.py
  4. 91
      esp8266/modules/onewire.py

@ -1,101 +1,51 @@
"""
DS18x20 temperature sensor driver for MicroPython.
This driver uses the OneWire driver to control DS18S20 and DS18B20
temperature sensors. It supports multiple devices on the same 1-wire bus.
The following example assumes the ground of your DS18x20 is connected to
Y11, vcc is connected to Y9 and the data pin is connected to Y10.
>>> from machine import Pin
>>> gnd = Pin('Y11', Pin.OUT_PP)
>>> gnd.off()
>>> vcc = Pin('Y9', Pin.OUT_PP)
>>> vcc.on()
>>> from ds18x20 import DS18X20
>>> d = DS18X20(Pin('Y10'))
Call read_temps to read all sensors:
>>> result = d.read_temps()
>>> print(result)
[20.875, 20.8125]
Call read_temp to read the temperature of a specific sensor:
>>> result = d.read_temp(d.roms[0])
>>> print(result)
20.25
If only one DS18x20 is attached to the bus, then you don't need to
pass a ROM to read_temp:
>>> result = d.read_temp()
>>> print(result)
20.25
"""
from onewire import OneWire
class DS18X20(object):
def __init__(self, pin):
self.ow = OneWire(pin)
# Scan the 1-wire devices, but only keep those which have the
# correct # first byte in their rom for a DS18x20 device.
self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
def read_temp(self, rom=None):
"""
Read and return the temperature of one DS18x20 device.
Pass the 8-byte bytes object with the ROM of the specific device you want to read.
If only one DS18x20 device is attached to the bus you may omit the rom parameter.
"""
rom = rom or self.roms[0]
ow = self.ow
ow.reset()
ow.select_rom(rom)
ow.write_byte(0x44) # Convert Temp
while True:
if ow.read_bit():
break
ow.reset()
ow.select_rom(rom)
ow.write_byte(0xbe) # Read scratch
data = ow.read_bytes(9)
return self.convert_temp(rom[0], data)
def read_temps(self):
"""
Read and return the temperatures of all attached DS18x20 devices.
"""
temps = []
for rom in self.roms:
temps.append(self.read_temp(rom))
return temps
def convert_temp(self, rom0, data):
"""
Convert the raw temperature data into degrees celsius and return as a float.
"""
temp_lsb = data[0]
temp_msb = data[1]
if rom0 == 0x10:
if temp_msb != 0:
# convert negative number
temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1.
temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement
# DS18x20 temperature sensor driver for MicroPython.
# MIT license; Copyright (c) 2016 Damien P. George
from micropython import const
_CONVERT = const(0x44)
_RD_SCRATCH = const(0xbe)
_WR_SCRATCH = const(0x4e)
class DS18X20:
def __init__(self, onewire):
self.ow = onewire
self.buf = bytearray(9)
def scan(self):
return [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
def convert_temp(self):
self.ow.reset(True)
self.ow.writebyte(self.ow.SKIP_ROM)
self.ow.writebyte(_CONVERT)
def read_scratch(self, rom):
self.ow.reset(True)
self.ow.select_rom(rom)
self.ow.writebyte(_RD_SCRATCH)
self.ow.readinto(self.buf)
if self.ow.crc8(self.buf):
raise Exception('CRC error')
return self.buf
def write_scratch(self, rom, buf):
self.ow.reset(True)
self.ow.select_rom(rom)
self.ow.writebyte(_WR_SCRATCH)
self.ow.write(buf)
def read_temp(self, rom):
buf = self.read_scratch(rom)
if rom[0] == 0x10:
if buf[1]:
t = buf[0] >> 1 | 0x80
t = -((~t + 1) & 0xff)
else:
temp_read = temp_lsb >> 1 # truncate bit 0 by shifting
count_remain = data[6]
count_per_c = data[7]
temp = temp_read - 0.25 + (count_per_c - count_remain) / count_per_c
return temp
elif rom0 == 0x28:
temp = (temp_msb << 8 | temp_lsb) / 16
if (temp_msb & 0xf8) == 0xf8: # for negative temperature
temp -= 0x1000
return temp
t = buf[0] >> 1
return t - 0.25 + (buf[7] - buf[6]) / buf[7]
else:
assert False
t = buf[1] << 8 | buf[0]
if t & 0x8000: # sign bit set
t = -((t ^ 0xffff) + 1)
return t / 16

@ -1,336 +1,91 @@
"""
OneWire library ported to MicroPython by Jason Hildebrand.
# 1-Wire driver for MicroPython
# MIT license; Copyright (c) 2016 Damien P. George
from micropython import const
import _onewire as _ow
TODO:
* implement and test parasite-power mode (as an init option)
* port the crc checks
The original upstream copyright and terms follow.
------------------------------------------------------------------------------
Copyright (c) 2007, Jim Studt (original old version - many contributors since)
OneWire has been maintained by Paul Stoffregen (paul@pjrc.com) since
January 2010.
26 Sept 2008 -- Robin James
Jim Studt's original library was modified by Josh Larios.
Tom Pollard, pollard@alum.mit.edu, contributed around May 20, 2008
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Much of the code was inspired by Derek Yerger's code, though I don't
think much of that remains. In any event that was..
(copyleft) 2006 by Derek Yerger - Free to distribute freely.
"""
import pyb
from pyb import disable_irq
from pyb import enable_irq
class OneWireError(Exception):
pass
class OneWire:
def __init__(self, pin):
"""
Pass the data pin connected to your one-wire device(s), for example Pin('X1').
The one-wire protocol allows for multiple devices to be attached.
"""
self.data_pin = pin
self.write_delays = (1, 40, 40, 1)
self.read_delays = (1, 1, 40)
# cache a bunch of methods and attributes. This is necessary in _write_bit and
# _read_bit to achieve the timing required by the OneWire protocol.
self.cache = (pin.init, pin.value, pin.OUT_PP, pin.IN, pin.PULL_UP)
pin.init(pin.IN, pin.PULL_UP)
def reset(self):
"""
Perform the onewire reset function.
Returns 1 if a device asserted a presence pulse, 0 otherwise.
If you receive 0, then check your wiring and make sure you are providing
power and ground to your devices.
"""
retries = 25
self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP)
# We will wait up to 250uS for
# the bus to come high, if it doesn't then it is broken or shorted
# and we return a 0;
# wait until the wire is high... just in case
while True:
if self.data_pin.value():
break
retries -= 1
if retries == 0:
raise OSError("OneWire pin didn't go high")
pyb.udelay(10)
# pull the bus low for at least 480us
self.data_pin.low()
self.data_pin.init(self.data_pin.OUT_PP)
pyb.udelay(480)
SEARCH_ROM = const(0xf0)
MATCH_ROM = const(0x55)
SKIP_ROM = const(0xcc)
# If there is a slave present, it should pull the bus low within 60us
i = pyb.disable_irq()
self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_UP)
pyb.udelay(70)
presence = not self.data_pin.value()
pyb.enable_irq(i)
pyb.udelay(410)
return presence
def write_bit(self, value):
"""
Write a single bit.
"""
pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
self._write_bit(value, pin_init, pin_value, Pin_OUT_PP)
def __init__(self, pin):
self.pin = pin
self.pin.init(pin.OPEN_DRAIN)
def _write_bit(self, value, pin_init, pin_value, Pin_OUT_PP):
"""
Write a single bit - requires cached methods/attributes be passed as arguments.
See also write_bit()
"""
d0, d1, d2, d3 = self.write_delays
udelay = pyb.udelay
if value:
# write 1
i = disable_irq()
pin_value(0)
pin_init(Pin_OUT_PP)
udelay(d0)
pin_value(1)
enable_irq(i)
udelay(d1)
else:
# write 0
i = disable_irq()
pin_value(0)
pin_init(Pin_OUT_PP)
udelay(d2)
pin_value(1)
enable_irq(i)
udelay(d3)
def reset(self, required=False):
reset = _ow.reset(self.pin)
if required and not reset:
raise OneWireError
return reset
def write_byte(self, value):
"""
Write a byte. The pin will go tri-state at the end of the write to avoid
heating in a short or other mishap.
"""
pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
for i in range(8):
self._write_bit(value & 1, pin_init, pin_value, Pin_OUT_PP)
value >>= 1
pin_init(Pin_IN, Pin_PULL_UP)
def readbit(self):
return _ow.readbit(self.pin)
def write_bytes(self, bytestring):
"""
Write a sequence of bytes.
"""
for byte in bytestring:
self.write_byte(byte)
def readbyte(self):
return _ow.readbyte(self.pin)
def _read_bit(self, pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP):
"""
Read a single bit - requires cached methods/attributes be passed as arguments.
See also read_bit()
"""
d0, d1, d2 = self.read_delays
udelay = pyb.udelay
pin_init(Pin_IN, Pin_PULL_UP) # TODO why do we need this?
i = disable_irq()
pin_value(0)
pin_init(Pin_OUT_PP)
udelay(d0)
pin_init(Pin_IN, Pin_PULL_UP)
udelay(d1)
value = pin_value()
enable_irq(i)
udelay(d2)
return value
def readinto(self, buf):
for i in range(len(buf)):
buf[i] = _ow.readbyte(self.pin)
def read_bit(self):
"""
Read a single bit.
"""
pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
return self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
def writebit(self, value):
return _ow.writebit(self.pin, value)
def read_byte(self):
"""
Read a single byte and return the value as an integer.
See also read_bytes()
"""
pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
value = 0
for i in range(8):
bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
value |= bit << i
return value
def writebyte(self, value):
return _ow.writebyte(self.pin, value)
def read_bytes(self, count):
"""
Read a sequence of N bytes.
The bytes are returned as a bytearray.
"""
s = bytearray(count)
for i in range(count):
s[i] = self.read_byte()
return s
def write(self, buf):
for b in buf:
_ow.writebyte(self.pin, b)
def select_rom(self, rom):
"""
Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
"""
assert len(rom) == 8, "ROM must be 8 bytes"
self.reset()
self.write_byte(0x55) # ROM MATCH
self.write_bytes(rom)
def read_rom(self):
"""
Read the ROM - this works if there is only a single device attached.
"""
self.reset()
self.write_byte(0x33) # READ ROM
rom = self.read_bytes(8)
# TODO: check CRC of the ROM
return rom
def skip_rom(self):
"""
Send skip-rom command - this works if there is only one device attached.
"""
self.write_byte(0xCC) # SKIP ROM
def depower(self):
self.data_pin.init(self.data_pin.IN, self.data_pin.PULL_NONE)
self.writebyte(MATCH_ROM)
self.write(rom)
def scan(self):
"""
Return a list of ROMs for all attached devices.
Each ROM is returned as a bytes object of 8 bytes.
"""
devices = []
self._reset_search()
while True:
rom = self._search()
if not rom:
return devices
devices.append(rom)
def _reset_search(self):
self.last_discrepancy = 0
self.last_device_flag = False
self.last_family_discrepancy = 0
self.rom = bytearray(8)
def _search(self):
# initialize for search
id_bit_number = 1
last_zero = 0
rom_byte_number = 0
rom_byte_mask = 1
search_result = 0
pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP = self.cache
# if the last call was not the last one
if not self.last_device_flag:
# 1-Wire reset
if not self.reset():
self._reset_search()
return None
# issue the search command
self.write_byte(0xF0)
# loop to do the search
while rom_byte_number < 8: # loop until through all ROM bytes 0-7
# read a bit and its complement
id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
cmp_id_bit = self._read_bit(pin_init, pin_value, Pin_OUT_PP, Pin_IN, Pin_PULL_UP)
# check for no devices on 1-wire
if (id_bit == 1) and (cmp_id_bit == 1):
break
diff = 65
rom = False
for i in range(0xff):
rom, diff = self._search_rom(rom, diff)
if rom:
devices += [rom]
if diff == 0:
break
return devices
def _search_rom(self, l_rom, diff):
if not self.reset():
return None, 0
self.writebyte(SEARCH_ROM)
if not l_rom:
l_rom = bytearray(8)
rom = bytearray(8)
next_diff = 0
i = 64
for byte in range(8):
r_b = 0
for bit in range(8):
b = self.readbit()
if self.readbit():
if b: # there are no devices or there is an error on the bus
return None, 0
else:
# all devices coupled have 0 or 1
if (id_bit != cmp_id_bit):
search_direction = id_bit # bit write value for search
else:
# if this discrepancy if before the Last Discrepancy
# on a previous next then pick the same as last time
if (id_bit_number < self.last_discrepancy):
search_direction = (self.rom[rom_byte_number] & rom_byte_mask) > 0
else:
# if equal to last pick 1, if not then pick 0
search_direction = (id_bit_number == self.last_discrepancy)
# if 0 was picked then record its position in LastZero
if search_direction == 0:
last_zero = id_bit_number
# check for Last discrepancy in family
if last_zero < 9:
self.last_family_discrepancy = last_zero
# set or clear the bit in the ROM byte rom_byte_number
# with mask rom_byte_mask
if search_direction == 1:
self.rom[rom_byte_number] |= rom_byte_mask
else:
self.rom[rom_byte_number] &= ~rom_byte_mask
# serial number search direction write bit
#print('sd', search_direction)
self.write_bit(search_direction)
# increment the byte counter id_bit_number
# and shift the mask rom_byte_mask
id_bit_number += 1
rom_byte_mask <<= 1
# if the mask is 0 then go to new SerialNum byte rom_byte_number and reset mask
if rom_byte_mask == 0x100:
rom_byte_number += 1
rom_byte_mask = 1
# if the search was successful then
if not (id_bit_number < 65):
# search successful so set last_discrepancy,last_device_flag,search_result
self.last_discrepancy = last_zero
# check for last device
if self.last_discrepancy == 0:
self.last_device_flag = True
search_result = True
# if no device found then reset counters so next 'search' will be like a first
if not search_result or not self.rom[0]:
self._reset_search()
return None
else:
return bytes(self.rom)
if not b: # collision, two devices with different bit meaning
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
b = 1
next_diff = i
self.writebit(b)
if b:
r_b |= 1 << bit
i -= 1
rom[byte] = r_b
return rom, next_diff
def crc8(self, data):
return _ow.crc8(data)

@ -1,51 +0,0 @@
# DS18x20 temperature sensor driver for MicroPython.
# MIT license; Copyright (c) 2016 Damien P. George
from micropython import const
_CONVERT = const(0x44)
_RD_SCRATCH = const(0xbe)
_WR_SCRATCH = const(0x4e)
class DS18X20:
def __init__(self, onewire):
self.ow = onewire
self.buf = bytearray(9)
def scan(self):
return [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
def convert_temp(self):
self.ow.reset(True)
self.ow.writebyte(self.ow.SKIP_ROM)
self.ow.writebyte(_CONVERT)
def read_scratch(self, rom):
self.ow.reset(True)
self.ow.select_rom(rom)
self.ow.writebyte(_RD_SCRATCH)
self.ow.readinto(self.buf)
if self.ow.crc8(self.buf):
raise Exception('CRC error')
return self.buf
def write_scratch(self, rom, buf):
self.ow.reset(True)
self.ow.select_rom(rom)
self.ow.writebyte(_WR_SCRATCH)
self.ow.write(buf)
def read_temp(self, rom):
buf = self.read_scratch(rom)
if rom[0] == 0x10:
if buf[1]:
t = buf[0] >> 1 | 0x80
t = -((~t + 1) & 0xff)
else:
t = buf[0] >> 1
return t - 0.25 + (buf[7] - buf[6]) / buf[7]
else:
t = buf[1] << 8 | buf[0]
if t & 0x8000: # sign bit set
t = -((t ^ 0xffff) + 1)
return t / 16

@ -1,91 +0,0 @@
# 1-Wire driver for MicroPython on ESP8266
# MIT license; Copyright (c) 2016 Damien P. George
from micropython import const
import _onewire as _ow
class OneWireError(Exception):
pass
class OneWire:
SEARCH_ROM = const(0xf0)
MATCH_ROM = const(0x55)
SKIP_ROM = const(0xcc)
def __init__(self, pin):
self.pin = pin
self.pin.init(pin.OPEN_DRAIN)
def reset(self, required=False):
reset = _ow.reset(self.pin)
if required and not reset:
raise OneWireError
return reset
def readbit(self):
return _ow.readbit(self.pin)
def readbyte(self):
return _ow.readbyte(self.pin)
def readinto(self, buf):
for i in range(len(buf)):
buf[i] = _ow.readbyte(self.pin)
def writebit(self, value):
return _ow.writebit(self.pin, value)
def writebyte(self, value):
return _ow.writebyte(self.pin, value)
def write(self, buf):
for b in buf:
_ow.writebyte(self.pin, b)
def select_rom(self, rom):
self.reset()
self.writebyte(MATCH_ROM)
self.write(rom)
def scan(self):
devices = []
diff = 65
rom = False
for i in range(0xff):
rom, diff = self._search_rom(rom, diff)
if rom:
devices += [rom]
if diff == 0:
break
return devices
def _search_rom(self, l_rom, diff):
if not self.reset():
return None, 0
self.writebyte(SEARCH_ROM)
if not l_rom:
l_rom = bytearray(8)
rom = bytearray(8)
next_diff = 0
i = 64
for byte in range(8):
r_b = 0
for bit in range(8):
b = self.readbit()
if self.readbit():
if b: # there are no devices or there is an error on the bus
return None, 0
else:
if not b: # collision, two devices with different bit meaning
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
b = 1
next_diff = i
self.writebit(b)
if b:
r_b |= 1 << bit
i -= 1
rom[byte] = r_b
return rom, next_diff
def crc8(self, data):
return _ow.crc8(data)
Loading…
Cancel
Save