This commit is contained in:
2025-02-26 23:12:36 +01:00
commit 90f12c5cef
5 changed files with 284 additions and 0 deletions

179
CO2Meter.py Normal file
View File

@@ -0,0 +1,179 @@
"""
Module for reading out CO2Meter USB devices
via a hidraw device under Linux
"""
import sys
import fcntl
import threading
import weakref
CO2METER_CO2 = 0x50
CO2METER_TEMP = 0x42
CO2METER_HUM = 0x41
HIDIOCSFEATURE_9 = 0xC0094806
def _co2_worker(weak_self):
"""
Worker thread that constantly reads from the usb device.
"""
while True:
self = weak_self()
if self is None:
break
self._read_data()
if not self._running:
break
del self
class CO2Meter:
_key = [0xc4, 0xc6, 0xc0, 0x92, 0x40, 0x23, 0xdc, 0x96]
_device = ""
_values = {}
_file = ""
_running = True
_callback = None
def __init__(self, device="/dev/hidraw0", callback=None):
self._device = device
self._callback = callback
self._file = open(device, "a+b", 0)
if sys.version_info >= (3,):
set_report = [0] + self._key
fcntl.ioctl(self._file, HIDIOCSFEATURE_9, bytearray(set_report))
else:
set_report_str = "\x00" + "".join(chr(e) for e in self._key)
fcntl.ioctl(self._file, HIDIOCSFEATURE_9, set_report_str)
thread = threading.Thread(target=_co2_worker, args=(weakref.ref(self),))
thread.daemon = True
thread.start()
def _read_data(self):
"""
Function that reads from the device, decodes it, validates the checksum
and adds the data to the dict _values.
Additionally calls the _callback if set
"""
try:
result = self._file.read(8)
if sys.version_info >= (3,):
data = list(result)
else:
data = list(ord(e) for e in result)
if data[4] != 0x0d:
""" newer devices don't encrypt the data, if byte 4!=0x0d assume encrypted data """
data = self._decrypt(data)
if data[4] != 0x0d or (sum(data[:3]) & 0xff) != data[3]:
print(self._hd(data), "Checksum error")
return
operation = data[0]
val = data[1] << 8 | data[2]
self._values[operation] = self._convert_value(operation, val)
if self._callback is not None:
if operation in {CO2METER_CO2, CO2METER_TEMP} or (operation == CO2METER_HUM and val != 0):
self._callback(sensor=operation, value=val)
except:
self._running = False
def _decrypt(self, data):
"""
The received data has some weak crypto that needs to be decoded first
"""
cstate = [0x48, 0x74, 0x65, 0x6D, 0x70, 0x39, 0x39, 0x65]
shuffle = [2, 4, 0, 7, 1, 6, 5, 3]
phase1 = [0] * 8
for i, j in enumerate(shuffle):
phase1[j] = data[i]
phase2 = [0] * 8
for i in range(8):
phase2[i] = phase1[i] ^ self._key[i]
phase3 = [0] * 8
for i in range(8):
phase3[i] = ((phase2[i] >> 3) | (phase2[(i - 1 + 8) % 8] << 5)) & 0xff
ctmp = [0] * 8
for i in range(8):
ctmp[i] = ((cstate[i] >> 4) | (cstate[i] << 4)) & 0xff
out = [0] * 8
for i in range(8):
out[i] = (0x100 + phase3[i] - ctmp[i]) & 0xff
return out
@staticmethod
def _convert_value(sensor, value):
""" Apply Conversion of value dending on sensor type """
if sensor == CO2METER_TEMP:
return round(value / 16.0 - 273.1, 1)
if sensor == CO2METER_HUM:
return round(value / 100.0, 1)
return value
@staticmethod
def _hd(data):
""" Helper function for printing the raw data """
return " ".join("%02X" % e for e in data)
def get_co2(self):
"""
read the co2 value from _values
:returns dict with value or empty
"""
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_CO2 in self._values:
result = {'co2': self._values[CO2METER_CO2]}
return result
def get_temperature(self):
"""
reads the temperature from _values
:returns dict with value or empty
"""
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_TEMP in self._values:
result = {'temperature': self._values[CO2METER_TEMP]}
return result
def get_humidity(self): # not implemented by all devices
"""
reads the humidty from _values.
not all devices support this but might still return a value 0.
So values of 0 are discarded.
:returns dict with value or empty
"""
if not self._running:
raise IOError("worker thread couldn't read data")
result = {}
if CO2METER_HUM in self._values and self._values[CO2METER_HUM] != 0:
result = {'humidity': self._values[CO2METER_HUM]}
return result
def get_data(self):
"""
get all currently available values
:returns dict with value or empty
"""
result = {}
result.update(self.get_co2())
result.update(self.get_temperature())
result.update(self.get_humidity())
return result

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 Michael Heinemann
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

64
README.md Normal file
View File

@@ -0,0 +1,64 @@
# CO2Meter
Python Module to use co2meters like the 'AirCO2ntrol Mini' from TFA Dostmann with USB ID 04d9:a052. There are also other modules using the same interface.
This module supports Python 2.7 and 3.x.
## Attribution
Reverse Engineering of the protocol and initial code done by [Henryk Plötz](https://github.com/henryk).
Read all about it at [hackaday](https://hackaday.io/project/5301-reverse-engineering-a-low-cost-usb-co-monitor)
Code derived from [this article](https://hackaday.io/project/5301-reverse-engineering-a-low-cost-usb-co-monitor/log/17909-all-your-base-are-belong-to-us)
## Install
With pip:
```bash
pip install git+https://github.com/heinemml/CO2Meter
```
Without pip:
```bash
python setup.py install
```
Remark: you don't need to install, you can also just copy the CO2Meter.py into your project.
If you don't want to run your script as root make sure you have sufficient rights to access the device file.
This udev rule can be used to set permissions.
```
ACTION=="remove", GOTO="co2mini_end"
SUBSYSTEMS=="usb", KERNEL=="hidraw*", ATTRS{idVendor}=="04d9", ATTRS{idProduct}=="a052", GROUP="plugdev", MODE="0660", SYMLINK+="co2mini%n", GOTO="co2mini_end"
LABEL="co2mini_end"
```
save it as `/etc/udev/rules.d/90-co2mini.rules` and add the script user to the group `plugdev`.
This rules make the device also available as co2mini0 (increase trailing number for each additional device).
## Usage
```python
from CO2Meter import *
import time
sensor = CO2Meter("/dev/hidraw0")
while True:
time.sleep(2)
sensor.get_data()
```
The device writes out one value at a time. So we need to parse some data until we have co2 and temperature. Thus the get_data() method will initially return none or only one value (whichever comes first).
When you just need one measurement you should wait some seconds or iterate until you get a full reading. If you just need co2 a call to `get_co2` might speed things up.
### Callback
You can pass a callback to the constructor. It will be called when any of the values is updated. The parameters passed are `sensor` and `value`. `sensor` contains one of these constants:
```python
CO2METER_CO2 = 0x50
CO2METER_TEMP = 0x42
CO2METER_HUM = 0x41
```
### Error handling
In Case the device can't be read anymore (e.g. it was unplugged) the worker thread will end in the background. Afterwards calls to any of the `get_*` functions will throw an `IOError`. You will need to handle any resetup, making sure that the device is there etc yourself.

12
example.py Normal file
View File

@@ -0,0 +1,12 @@
#!/bin/env python
import time
from datetime import datetime
from CO2Meter import *
Meter = CO2Meter("/dev/hidraw0")
while True:
measurement = Meter.get_data()
measurement.update({'timestamp': datetime.now()})
print(measurement)
time.sleep(5)

8
setup.py Normal file
View File

@@ -0,0 +1,8 @@
from distutils.core import setup
setup(name='CO2Meter',
version='2.3',
py_modules=['CO2Meter'],
url='https://github.com/heinemml/CO2Meter',
description='Library to access USB CO2Meters'
)