88 lines
2.5 KiB
Python
88 lines
2.5 KiB
Python
import adafruit_husb238
|
|
import time
|
|
|
|
class HUSB238:
|
|
_pd = None
|
|
|
|
def __init__(self, i2c):
|
|
self._i2c = i2c
|
|
|
|
def connect(self):
|
|
if (self.is_connected()):
|
|
try:
|
|
return self._pd.attached
|
|
except Exception as e:
|
|
return False
|
|
try:
|
|
self._pd = adafruit_husb238.Adafruit_HUSB238(self._i2c)
|
|
return True
|
|
except Exception as e:
|
|
self._pd = None
|
|
return False
|
|
|
|
def is_connected(self):
|
|
if self._pd is not None:
|
|
try:
|
|
return self._pd.attached
|
|
except Exception as e:
|
|
self._pd = None
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
def attempt_wattage(self, watts):
|
|
if not self.is_connected():
|
|
return False, 0
|
|
if self.wattage >= watts:
|
|
return True, self.wattage
|
|
voltages = self.available_voltages
|
|
next_voltage_index = voltages.index(self.voltage) + 1
|
|
while self.wattage < watts and next_voltage_index < len(voltages):
|
|
if not self.is_connected():
|
|
print('Connection lost')
|
|
return False, 0
|
|
target = voltages[next_voltage_index]
|
|
self.voltage = target
|
|
time.sleep(1)
|
|
if (self._pd.voltage != target):
|
|
print(f'Voltage did not switch to {target}v')
|
|
return False, self.wattage
|
|
next_voltage_index += 1
|
|
resulting_wattage = self.wattage
|
|
return resulting_wattage >= watts, resulting_wattage
|
|
|
|
@property
|
|
def wattage(self):
|
|
return self.voltage * self.current
|
|
|
|
@property
|
|
def available_voltages(self):
|
|
return self._safe_pd_call(lambda: self._pd.available_voltages, [])
|
|
|
|
@property
|
|
def voltage(self):
|
|
return self._safe_pd_call(lambda: self._pd.voltage, 0);
|
|
|
|
@voltage.setter
|
|
def voltage(self, value):
|
|
self._pd.voltage = value
|
|
|
|
@property
|
|
def current(self):
|
|
return self._safe_pd_call(lambda: self._pd.current, 0)
|
|
|
|
def _unsafe_pd_call(self, func, default_val, attempts=20):
|
|
attempt_count = 0
|
|
while attempt_count < attempts:
|
|
try:
|
|
return func()
|
|
except Exception as e:
|
|
print('unsafe waiting')
|
|
time.sleep(0.5)
|
|
return default_val
|
|
|
|
def _safe_pd_call(self, func, default_val):
|
|
if self.is_connected():
|
|
return func()
|
|
else:
|
|
return default_val |