From 40f4b8ce4ee71ddfd8060f6f555dbe1b21d644f5 Mon Sep 17 00:00:00 2001 From: ahmad98 Date: Sat, 4 Dec 2021 10:45:07 -0800 Subject: [PATCH] fix high cpu load problem + apply flake8 rules + fix is syntax --- hx711.py | 185 ++++++++++++++++++++++++------------------------------- 1 file changed, 80 insertions(+), 105 deletions(-) diff --git a/hx711.py b/hx711.py index 296b913..db0db4f 100644 --- a/hx711.py +++ b/hx711.py @@ -5,7 +5,6 @@ import threading - class HX711: def __init__(self, dout, pd_sck, gain=128): @@ -16,7 +15,7 @@ def __init__(self, dout, pd_sck, gain=128): # Mutex for reading from the HX711, in case multiple threads in client # software try to access get values from the class at the same time. self.readLock = threading.Lock() - + GPIO.setmode(GPIO.BCM) GPIO.setup(self.PD_SCK, GPIO.OUT) GPIO.setup(self.DOUT, GPIO.IN) @@ -42,21 +41,18 @@ def __init__(self, dout, pd_sck, gain=128): # Think about whether this is necessary. time.sleep(1) - def convertFromTwosComplement24bit(self, inputValue): return -(inputValue & 0x800000) + (inputValue & 0x7fffff) - def is_ready(self): return GPIO.input(self.DOUT) == 0 - def set_gain(self, gain): - if gain is 128: + if gain == 128: self.GAIN = 1 - elif gain is 64: + elif gain == 64: self.GAIN = 3 - elif gain is 32: + elif gain == 32: self.GAIN = 2 GPIO.output(self.PD_SCK, False) @@ -64,7 +60,6 @@ def set_gain(self, gain): # Read out a set of raw bytes and throw it away. self.readRawBytes() - def get_gain(self): if self.GAIN == 1: return 128 @@ -75,36 +70,33 @@ def get_gain(self): # Shouldn't get here. return 0 - def readNextBit(self): - # Clock HX711 Digital Serial Clock (PD_SCK). DOUT will be - # ready 1us after PD_SCK rising edge, so we sample after - # lowering PD_SCL, when we know DOUT will be stable. - GPIO.output(self.PD_SCK, True) - GPIO.output(self.PD_SCK, False) - value = GPIO.input(self.DOUT) - - # Convert Boolean to int and return it. - return int(value) + # Clock HX711 Digital Serial Clock (PD_SCK). DOUT will be + # ready 1us after PD_SCK rising edge, so we sample after + # lowering PD_SCL, when we know DOUT will be stable. + GPIO.output(self.PD_SCK, True) + GPIO.output(self.PD_SCK, False) + value = GPIO.input(self.DOUT) + # Convert Boolean to int and return it. + return int(value) def readNextByte(self): - byteValue = 0 - - # Read bits and build the byte from top, or bottom, depending - # on whether we are in MSB or LSB bit mode. - for x in range(8): - if self.bit_format == 'MSB': - byteValue <<= 1 - byteValue |= self.readNextBit() - else: - byteValue >>= 1 - byteValue |= self.readNextBit() * 0x80 - - # Return the packed byte. - return byteValue - + byteValue = 0 + + # Read bits and build the byte from top, or bottom, depending + # on whether we are in MSB or LSB bit mode. + for x in range(8): + if self.bit_format == 'MSB': + byteValue <<= 1 + byteValue |= self.readNextBit() + else: + byteValue >>= 1 + byteValue |= self.readNextBit() * 0x80 + + # Return the packed byte. + return byteValue def readRawBytes(self): # Wait for and get the Read Lock, incase another thread is already @@ -113,49 +105,49 @@ def readRawBytes(self): # Wait until HX711 is ready for us to read a sample. while not self.is_ready(): - pass + time.sleep(0.01) # Read three bytes of data from the HX711. - firstByte = self.readNextByte() + firstByte = self.readNextByte() secondByte = self.readNextByte() - thirdByte = self.readNextByte() + thirdByte = self.readNextByte() # HX711 Channel and gain factor are set by number of bits read # after 24 data bits. for i in range(self.GAIN): - # Clock a bit out of the HX711 and throw it away. - self.readNextBit() + # Clock a bit out of the HX711 and throw it away. + self.readNextBit() # Release the Read Lock, now that we've finished driving the HX711 # serial interface. - self.readLock.release() + self.readLock.release() # Depending on how we're configured, return an orderd list of raw byte # values. if self.byte_format == 'LSB': - return [thirdByte, secondByte, firstByte] + return [thirdByte, secondByte, firstByte] else: - return [firstByte, secondByte, thirdByte] - + return [firstByte, secondByte, thirdByte] def read_long(self): # Get a sample from the HX711 in the form of raw bytes. dataBytes = self.readRawBytes() - if self.DEBUG_PRINTING: print(dataBytes,) - + # Join the raw bytes into a single 24bit 2s complement value. twosComplementValue = ((dataBytes[0] << 16) | - (dataBytes[1] << 8) | + (dataBytes[1] << 8) | dataBytes[2]) if self.DEBUG_PRINTING: print("Twos: 0x%06x" % twosComplementValue) - + # Convert from 24bit twos-complement to a signed value. - signedIntValue = self.convertFromTwosComplement24bit(twosComplementValue) + signedIntValue = self.convertFromTwosComplement24bit( + twosComplementValue + ) # Record the latest sample value we've read. self.lastVal = signedIntValue @@ -163,7 +155,6 @@ def read_long(self): # Return the sample value we've read from the HX711. return int(signedIntValue) - def read_average(self, times=3): # Make sure we've been asked to take a rational amount of samples. if times <= 0: @@ -178,8 +169,8 @@ def read_average(self, times=3): if times < 5: return self.read_median(times) - # If we're taking a lot of samples, we'll collect them in a list, remove - # the outliers, then take the mean of the remaining set. + # If we're taking a lot of samples, we'll collect them in a list, + # remove the outliers, then take the mean of the remaining set. valueList = [] for x in range(times): @@ -187,7 +178,8 @@ def read_average(self, times=3): valueList.sort() - # We'll be trimming 20% of outlier samples from top and bottom of collected set. + # We'll be trimming 20% of outlier samples from + # top and bottom of collected set. trimAmount = int(len(valueList) * 0.2) # Trim the edge case values. @@ -196,43 +188,42 @@ def read_average(self, times=3): # Return the mean of remaining samples. return sum(valueList) / len(valueList) - - # A median-based read method, might help when getting random value spikes + # A median-based read method, + # might help when getting random value spikes # for unknown or CPU-related reasons def read_median(self, times=3): - if times <= 0: - raise ValueError("HX711::read_median(): times must be greater than zero!") - - # If times == 1, just return a single reading. - if times == 1: - return self.read_long() + if times <= 0: + raise ValueError( + "HX711::read_median(): times must be greater than zero!" + ) - valueList = [] + # If times == 1, just return a single reading. + if times == 1: + return self.read_long() - for x in range(times): - valueList += [self.read_long()] + valueList = [] - valueList.sort() + for x in range(times): + valueList += [self.read_long()] - # If times is odd we can just take the centre value. - if (times & 0x1) == 0x1: - return valueList[len(valueList) // 2] - else: - # If times is even we have to take the arithmetic mean of - # the two middle values. - midpoint = len(valueList) / 2 - return sum(valueList[midpoint:midpoint+2]) / 2.0 + valueList.sort() + # If times is odd we can just take the centre value. + if (times & 0x1) == 0x1: + return valueList[len(valueList) // 2] + else: + # If times is even we have to take the arithmetic mean of + # the two middle values. + midpoint = len(valueList) / 2 + return sum(valueList[midpoint:midpoint+2]) / 2.0 # Compatibility function, uses channel A version def get_value(self, times=3): return self.get_value_A(times) - def get_value_A(self, times=3): return self.read_median(times) - self.get_offset_A() - def get_value_B(self, times=3): # for channel B, we need to set_gain(32) g = self.get_gain() @@ -245,7 +236,6 @@ def get_value_B(self, times=3): def get_weight(self, times=3): return self.get_weight_A(times) - def get_weight_A(self, times=3): value = self.get_value_A(times) value = value / self.REFERENCE_UNIT @@ -256,22 +246,20 @@ def get_weight_B(self, times=3): value = value / self.REFERENCE_UNIT_B return value - # Sets tare for channel A for compatibility purposes def tare(self, times=15): return self.tare_A(times) - - + def tare_A(self, times=15): # Backup REFERENCE_UNIT value backupReferenceUnit = self.get_reference_unit_A() self.set_reference_unit_A(1) - + value = self.read_average(times) if self.DEBUG_PRINTING: print("Tare A value:", value) - + self.set_offset_A(value) # Restore the reference unit, now that we've got our offset. @@ -279,7 +267,6 @@ def tare_A(self, times=15): return value - def tare_B(self, times=15): # Backup REFERENCE_UNIT value backupReferenceUnit = self.get_reference_unit_B() @@ -293,17 +280,15 @@ def tare_B(self, times=15): if self.DEBUG_PRINTING: print("Tare B value:", value) - + self.set_offset_B(value) # Restore gain/channel/reference unit settings. self.set_gain(backupGain) self.set_reference_unit_B(backupReferenceUnit) - - return value + return value - def set_reading_format(self, byte_format="LSB", bit_format="MSB"): if byte_format == "LSB": self.byte_format = byte_format @@ -319,9 +304,6 @@ def set_reading_format(self, byte_format="LSB", bit_format="MSB"): else: raise ValueError("Unrecognised bitformat: \"%s\"" % bit_format) - - - # sets offset for channel A for compatibility reasons def set_offset(self, offset): self.set_offset_A(offset) @@ -341,42 +323,38 @@ def get_offset_A(self): def get_offset_B(self): return self.OFFSET_B - - def set_reference_unit(self, reference_unit): self.set_reference_unit_A(reference_unit) - def set_reference_unit_A(self, reference_unit): # Make sure we aren't asked to use an invalid reference unit. if reference_unit == 0: - raise ValueError("HX711::set_reference_unit_A() can't accept 0 as a reference unit!") - return + raise ValueError( + "HX711::set_reference_unit_A() " + "can't accept 0 as a reference unit!" + ) self.REFERENCE_UNIT = reference_unit - def set_reference_unit_B(self, reference_unit): # Make sure we aren't asked to use an invalid reference unit. if reference_unit == 0: - raise ValueError("HX711::set_reference_unit_A() can't accept 0 as a reference unit!") - return + raise ValueError( + "HX711::set_reference_unit_A() " + "can't accept 0 as a reference unit!" + ) self.REFERENCE_UNIT_B = reference_unit - def get_reference_unit(self): return get_reference_unit_A() - def get_reference_unit_A(self): return self.REFERENCE_UNIT - def get_reference_unit_B(self): return self.REFERENCE_UNIT_B - - + def power_down(self): # Wait for and get the Read Lock, incase another thread is already # driving the HX711 serial interface. @@ -392,8 +370,7 @@ def power_down(self): # Release the Read Lock, now that we've finished driving the HX711 # serial interface. - self.readLock.release() - + self.readLock.release() def power_up(self): # Wait for and get the Read Lock, incase another thread is already @@ -417,10 +394,8 @@ def power_up(self): if self.get_gain() != 128: self.readRawBytes() - def reset(self): self.power_down() self.power_up() - # EOF - hx711.py