【昉·星光 2 RISC-V单板计算机】SHT40 和 AHT10 远程温湿度计
本文介绍了昉·星光2 (StarFive2)单板计算机通过 IIC 协议驱动 SHT40 和 AHT10 温湿度传感器,进一步连接 Home Assistant 实现远程数据采集和环境监控的项目设计。
项目介绍
本项目设计主要包括
- 准备工作:测试 VisionFive.gpio 和 IIC 通信协议;
- 驱动传感器:实现 IIC 驱动 SHT40 和 AHT10 传感器并打印采集的温湿度数据;
- Home Assistant 远程数据上传:数据通过 EMQX 和 MQTT 代理转发至 HA 平台,实现远程数据采集和监控。
准备工作
- 完成前面帖子描述的 VisionFive.gpio 安装和软件库更新,详见:昉·星光GPIO控制及软件库更新;
- 搭建 Home Assistant 智能家居平台,包括 Docker 安装、EMQX服务器平台部署等,详见:通过 MQTT 协议接入 Home Assistant .
驱动传感器
介绍了 SHT40 和 AHT10 两种温湿度传感器的驱动和数据采集。
SHT40
使用 IIC 通信驱动 SHT40 高精度温湿度传感器,实现数据的终端打印。
详见:SHT40 数字温湿度传感器 .
硬件连接
SHT40 | VisionFive2 - GPIO (1-40) |
---|---|
SCL | 5 |
SDA | 3 |
GND | 9 |
VCC | 1 |
代码
新建 I2C_Sensor_SHT40.py
文件,添加如下代码
import sys
import time
import fcntl
import struct
# I2C
I2C_SLAVE = 0x0703
SHT40_ADDR = 0x44
I2C_BUS = "/dev/i2c-0" # VisionFive2 I2C BUS
class SHT40:
def __init__(self, i2c_bus=I2C_BUS):
self.i2c_bus = i2c_bus
self.fd = None
def open(self):
try:
self.fd = open(self.i2c_bus, 'rb+', buffering=0)
fcntl.ioctl(self.fd, I2C_SLAVE, SHT40_ADDR)
return True
except Exception as e:
print(f"I2C init failed: {str(e)}")
return False
def close(self):
if self.fd:
self.fd.close()
def write(self, cmd):
try:
self.fd.write(bytes([cmd]))
except Exception as e:
print(f"Write failed: {str(e)}")
return False
return True
def read(self, length):
try:
return self.fd.read(length)
except Exception as e:
print(f"Read failed: {str(e)}")
return None
def measure(self, precision=0xFD):
""" 0xFD:high, 0xF6:middle, 0xE0:low """
if not self.write(precision):
return (None, None)
time.sleep(0.02)
data = self.read(6)
if not data or len(data) != 6:
return (None, None)
# CRC
if not self._check_crc(data[0:2], data[2]) or \
not self._check_crc(data[3:5], data[5]):
return (None, None)
#
temp = -45 + 175 * (int.from_bytes(data[0:2], 'big') / 65535.0)
hum = -6 + 125 * (int.from_bytes(data[3:5], 'big') / 65535.0)
hum = max(0.0, min(100.0, hum))
return (temp, hum)
def _check_crc(self, data, crc):
""" CRC-8 """
polynomial = 0x31
crc_result = 0xFF
for byte in data:
crc_result ^= byte
for _ in range(8):
if crc_result & 0x80:
crc_result = (crc_result << 1) ^ polynomial
else:
crc_result <<= 1
crc_result &= 0xFF
return crc_result == crc
def main():
sensor = SHT40()
if not sensor.open():
return 1
try:
while True:
temp, hum = sensor.measure()
if temp is not None and hum is not None:
print(f"Temperature: {temp:.2f} C, Humidity: {hum:.2f}%")
else:
print("Measurement failed")
time.sleep(1)
except KeyboardInterrupt:
print("\nExiting...")
finally:
sensor.close()
return 0
if __name__ == "__main__":
sys.exit(main())
保存代码,终端执行指令 python3 I2C_Sensor_SHT40.py
,程序运行,打印输出采集的传感器数据。
效果
终端输出采集的温湿度数据
AHT10
进一步实现同样是 IIC 通信的 AHT10 传感器的驱动,并终端打印温湿度数据。
详见:AHT10系列 - 广州奥松 .
硬件连接
AHT10 | VisionFive2 - GPIO (1-40) |
---|---|
SCL | 5 |
SDA | 3 |
GND | 9 |
VCC | 1 |
代码
新建 I2C_Sensor_AHT10.py
文件,添加如下代码
import sys
import time
import fcntl
# AHT10 device address
AHT10_I2C_ADDR = 0x38
# I2C control command
I2C_SLAVE = 0x0703
# VisionFive 2 default I2C BUS
I2C_BUS = "/dev/i2c-0"
class AHT10:
def __init__(self, i2c_bus=I2C_BUS):
self.i2c_bus = i2c_bus
self.fd = None
def open(self):
try:
self.fd = open(self.i2c_bus, 'rb+', buffering=0)
fcntl.ioctl(self.fd, I2C_SLAVE, AHT10_I2C_ADDR)
self._initialize_sensor()
return True
except Exception as e:
print(f"I2C initialized failed: {str(e)}")
return False
def close(self):
if self.fd:
self.fd.close()
def _initialize_sensor(self):
""" initialize AHT10 sensor """
# send initialized command
self._write_command([0xE1, 0x08, 0x00])
time.sleep(0.01) # wait for initialization complate
def _write_command(self, cmd_list):
""" write command """
try:
self.fd.write(bytes(cmd_list))
except Exception as e:
print(f"write failed: {str(e)}")
return False
return True
def _read_data(self, length):
""" read data from sensor """
try:
return self.fd.read(length)
except Exception as e:
print(f"read failed: {str(e)}")
return None
def trigger_measurement(self):
""" trigger measure """
return self._write_command([0xAC, 0x33, 0x00])
def read_status(self):
""" read register """
status = self._read_data(1)
return status[0] if status else None
def read_raw_data(self):
""" Read test data """
# trigger test
if not self.trigger_measurement():
return None
# wait for 80ms
timeout = 80 # ms
while timeout > 0:
status = self.read_status()
if status is None:
return None
if not (status & 0x80): # check flag
break
time.sleep(0.001)
timeout -= 1
if timeout <= 0:
print("timeout")
return None
# read 6-bit data
raw_data = self._read_data(6)
if not raw_data or len(raw_data) != 6:
return None
return raw_data
def get_temperature_humidity(self):
""" Get data """
raw_data = self.read_raw_data()
if not raw_data:
return (None, None)
# origin data
hum_raw = ((raw_data[1] << 12) | (raw_data[2] << 4) | (raw_data[3] >> 4))
temp_raw = (((raw_data[3] & 0x0F) << 16) | (raw_data[4] << 8) | raw_data[5])
# calculate
humidity = (hum_raw / 0x100000) * 100
temperature = (temp_raw / 0x100000) * 200 - 50
# humidity range
humidity = max(0.0, min(100.0, humidity))
temperature = max(-40.0, min(85.0, temperature))
return (temperature, humidity)
def main():
sensor = AHT10()
if not sensor.open():
return 1
try:
while True:
temp, hum = sensor.get_temperature_humidity()
if temp is not None and hum is not None:
print(f"Temperature: {temp:.2f} C, Humidity: {hum:.2f}%")
else:
print("Faild to get data")
time.sleep(1)
except KeyboardInterrupt:
print("\nExit project")
finally:
sensor.close()
return 0
if __name__ == "__main__":
sys.exit(main())
保存代码,终端执行指令 python3 I2C_Sensor_AHT10.py
运行采集数据并打印的程序。
效果
终端输出采集的温湿度数据
Home Assistant
在实现传感器驱动及串口打印的基础上,进一步实现 Home Assistant (HA) 智能家居平台的连接,实现远程温湿度计。
方案
实现数据上传至 Home Assistant 有多种方案,包括但不限于
- 使用 Home Assistant RESTful Sensor;
- 使用 Node-RED 作为中间件;
- 通过 Telegraf 和 InfluxDB;
- 直接写入 SQL 数据库;
- 使用 Syslog 集成;
- Socket 通过 EMQX MQTT 代理实现数据转发;
结合开发板资源以及官方软件库的支持范围,选择 Socket 和 EMQX 代理的方案。
流程图
代码
新建 aht10_emqx_auth
文件,添加如下代码
# aht10_emqx_auth.py
import socket
import json
import time
import struct
from I2C_Sensor_AHT10 import AHT10
EMQX_HOST = "192.168.1.121" # EMQX Server IP
EMQX_PORT = 1883 # MQTT default port
MQTT_USER = "LJL" # EMQX username
MQTT_PASS = "4421989g" # EMQX passward
CLIENT_ID = "vf2-aht10" # Client ID
TOPIC = "home/sensor/aht10" # MQTT topic
INTERVAL = 2 # delay
def connect_mqtt(sock):
"""MQTT"""
# MQTT protocol
protocol_name = b"MQTT"
protocol_level = 4 # MQTT 3.1.1
# top
connect_flags = 0xC2 #
keep_alive = 60 #
# load
client_id_bytes = CLIENT_ID.encode()
username_bytes = MQTT_USER.encode()
password_bytes = MQTT_PASS.encode()
# calculate length
remaining_length = (
2 + len(protocol_name) + 1 + 1 + 2 + #
2 + len(client_id_bytes) + #
2 + len(username_bytes) + #
2 + len(password_bytes) #
)
# build packet
packet = bytearray()
packet.append(0x10) #
# code
while remaining_length > 0:
byte = remaining_length % 128
remaining_length = remaining_length // 128
if remaining_length > 0:
byte = byte | 0x80
packet.append(byte)
# add protocol name
packet.extend(struct.pack("!H", len(protocol_name)))
packet.extend(protocol_name)
# add flag
packet.extend([protocol_level, connect_flags])
packet.extend(struct.pack("!H", keep_alive))
# add client id
packet.extend(struct.pack("!H", len(client_id_bytes)))
packet.extend(client_id_bytes)
# add user name
packet.extend(struct.pack("!H", len(username_bytes)))
packet.extend(username_bytes)
# add password
packet.extend(struct.pack("!H", len(password_bytes)))
packet.extend(password_bytes)
sock.sendall(packet)
resp = sock.recv(4)
if len(resp) < 4 or resp[0] != 0x20 or resp[3] != 0x00:
raise ConnectionError("MQTT authentication failed")
def publish_data(sock, temp, hum):
""" publish MQTT data """
payload = json.dumps({
"temperature": round(temp, 1),
"humidity": round(hum, 1)
}).encode()
# PUBLISH words
packet = bytearray()
packet.append(0x30) # PUBLISH QoS 0
# calculate remain length
remaining_length = 2 + len(TOPIC) + len(payload)
# just
temp_len = remaining_length
while temp_len > 0:
byte = temp_len % 128
temp_len = temp_len // 128
if temp_len > 0:
byte = byte | 0x80
packet.append(byte)
# message topic
packet.extend(struct.pack("!H", len(TOPIC)))
packet.extend(TOPIC.encode())
# message boday
packet.extend(payload)
sock.sendall(packet)
def main():
sensor = AHT10()
if not sensor.open():
return
while True:
try:
with socket.socket() as s:
s.connect((EMQX_HOST, EMQX_PORT))
connect_mqtt(s) # MQTT connect
print("Connected & authenticated to EMQX")
while True:
temp, hum = sensor.get_temperature_humidity()
if None not in (temp, hum):
publish_data(s, temp, hum)
print(f"Published: {temp} C, {hum}%")
time.sleep(INTERVAL)
except (ConnectionError, OSError) as e:
print(f"Connection error: {e}, retrying in 5s...")
time.sleep(5)
except KeyboardInterrupt:
break
finally:
sensor.close()
if __name__ == "__main__":
main()
保存代码,终端执行指令 aht10_emqx_auth.py
,程序运行。
测试
使用 MQTTX 软件作为 MQTT Broker, 测试 EMQX 服务器消息接收情况
- 新建连接,输入 EMQX 建立的客户端节点的 IP 地址,用户名和密码,连接至 EMQX 平台;
- 在此 MQTT Broker 新建对话主题 home/sensor/aht10 ,即可接收 EMQX 客户端发送的消息
网页端
将发送的 AHT10 数据添加至 Home Assistant 平台,实现网页端的 Home Assistant 传感器数据显示。
修改 configuration.yaml
配置文件
mqtt:
sensor:
- name: "AHT10/Temperature"
state_topic: "home/sensor/aht10"
suggested_display_precision: 1
unit_of_measurement: "C"
value_template: "{{ value_json.temperature }}"
- name: "AHT10/Humidity"
state_topic: "home/sensor/aht10"
suggested_display_precision: 1
unit_of_measurement: "%"
value_template: "{{ value_json.humidity }}"
应用配置并刷新网页,可在传感器标签页下找到 AHT10 传感器数据显示
分别点击 Humidity 和 Temperature 查看历史数据。
湿度历史曲线
温度历史曲线
APP
依托 HA 丰富的生态建设,在移动端可通过 Home Assistant 应用实现传感器数据的实时监控。
传感器连接开发板的实物图,显示了智能手机远程数据采集和环境温湿度监控
总结
本文介绍了昉·星光2 (StarFive2)单板计算机通过 IIC 协议驱动 SHT40 和 AHT10 温湿度传感器,进一步连接 Home Assistant 实现远程数据采集和环境监控的项目设计,为该开发板在物联网领域的应用提供了参考。

