【昉·星光 2 RISC-V单板计算机】SHT40 和 AHT10 远程温湿度计
分享作者:lijinlei
评测品牌:赛昉科技
评测型号:VF202040-A0
发布时间:2025-05-28 11:40:16
0
前言
介绍了昉·星光2 (StarFive2)单板计算机通过 IIC 协议驱动 SHT40 和 AHT10 温湿度传感器,进一步连接 Home Assistant 实现远程数据采集和环境监控的项目设计。
开源口碑分享内容

【昉·星光 2 RISC-V单板计算机】SHT40 和 AHT10 远程温湿度计

本文介绍了昉·星光2 (StarFive2)单板计算机通过 IIC 协议驱动 SHT40 和 AHT10 温湿度传感器,进一步连接 Home Assistant 实现远程数据采集和环境监控的项目设计。

项目介绍

本项目设计主要包括

  • 准备工作:测试 VisionFive.gpio 和 IIC 通信协议;
  • 驱动传感器:实现 IIC 驱动 SHT40 和 AHT10 传感器并打印采集的温湿度数据;
  • Home Assistant 远程数据上传:数据通过 EMQX 和 MQTT 代理转发至 HA 平台,实现远程数据采集和监控。

准备工作

  • 完成前面帖子描述的 VisionFive.gpio 安装和软件库更新,详见:昉·星光GPIO控制及软件库更新;
  • 搭建 Home Assistant 智能家居平台,包括 Docker 安装、EMQX服务器平台部署等,详见:通过 MQTT 协议接入 Home Assistant .

驱动传感器

介绍了 SHT40 和 AHT10 两种温湿度传感器的驱动和数据采集。

SHT40

使用 IIC 通信驱动 SHT40 高精度温湿度传感器,实现数据的终端打印。

详见:SHT40 数字温湿度传感器 .

硬件连接


SHT40VisionFive2 - GPIO (1-40)
SCL5
SDA3
GND9
VCC1


代码

新建 I2C_Sensor_SHT40.py 文件,添加如下代码

import sys
import time
import fcntl
import struct

# I2C
I2C_SLAVE = 0x0703
SHT40_ADDR = 0x44
I2C_BUS = "/dev/i2c-0"  # VisionFive2 I2C BUS

class SHT40:
    def __init__(self, i2c_bus=I2C_BUS):
        self.i2c_bus = i2c_bus
        self.fd = None
        
    def open(self):
        try:
            self.fd = open(self.i2c_bus, 'rb+', buffering=0)
            fcntl.ioctl(self.fd, I2C_SLAVE, SHT40_ADDR)
            return True
        except Exception as e:
            print(f"I2C init failed: {str(e)}")
            return False
    
    def close(self):
        if self.fd:
            self.fd.close()
    
    def write(self, cmd):
        try:
            self.fd.write(bytes([cmd]))
        except Exception as e:
            print(f"Write failed: {str(e)}")
            return False
        return True
    
    def read(self, length):
        try:
            return self.fd.read(length)
        except Exception as e:
            print(f"Read failed: {str(e)}")
            return None
    
    def measure(self, precision=0xFD):
        """ 0xFD:high, 0xF6:middle, 0xE0:low """
        if not self.write(precision):
            return (None, None)
        
        time.sleep(0.02)
        
        data = self.read(6)
        if not data or len(data) != 6:
            return (None, None)
        
        # CRC
        if not self._check_crc(data[0:2], data[2]) or \
           not self._check_crc(data[3:5], data[5]):
            return (None, None)
        
        # 
        temp = -45 + 175 * (int.from_bytes(data[0:2], 'big') / 65535.0)
        hum = -6 + 125 * (int.from_bytes(data[3:5], 'big') / 65535.0)
        hum = max(0.0, min(100.0, hum))
        
        return (temp, hum)
    
    def _check_crc(self, data, crc):
        """ CRC-8 """
        polynomial = 0x31
        crc_result = 0xFF
        
        for byte in data:
            crc_result ^= byte
            for _ in range(8):
                if crc_result & 0x80:
                    crc_result = (crc_result << 1) ^ polynomial
                else:
                    crc_result <<= 1
                crc_result &= 0xFF
        
        return crc_result == crc

def main():
    sensor = SHT40()
    
    if not sensor.open():
        return 1
    
    try:
        while True:
            temp, hum = sensor.measure()
            if temp is not None and hum is not None:
                print(f"Temperature: {temp:.2f} C, Humidity: {hum:.2f}%")
            else:
                print("Measurement failed")
            
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\nExiting...")
    
    finally:
        sensor.close()
    
    return 0

if __name__ == "__main__":
    sys.exit(main())

保存代码,终端执行指令 python3 I2C_Sensor_SHT40.py ,程序运行,打印输出采集的传感器数据。

效果

终端输出采集的温湿度数据

AHT10

进一步实现同样是 IIC 通信的 AHT10 传感器的驱动,并终端打印温湿度数据。

详见:AHT10系列 - 广州奥松 .

硬件连接


AHT10VisionFive2 - GPIO (1-40)
SCL5
SDA3
GND9
VCC1


代码

新建 I2C_Sensor_AHT10.py 文件,添加如下代码

import sys
import time
import fcntl

# AHT10 device address
AHT10_I2C_ADDR = 0x38
# I2C control command
I2C_SLAVE = 0x0703
# VisionFive 2 default I2C BUS
I2C_BUS = "/dev/i2c-0"

class AHT10:
    def __init__(self, i2c_bus=I2C_BUS):
        self.i2c_bus = i2c_bus
        self.fd = None
        
    def open(self):
        try:
            self.fd = open(self.i2c_bus, 'rb+', buffering=0)
            fcntl.ioctl(self.fd, I2C_SLAVE, AHT10_I2C_ADDR)
            self._initialize_sensor()
            return True
        except Exception as e:
            print(f"I2C initialized failed: {str(e)}")
            return False
    
    def close(self):
        if self.fd:
            self.fd.close()
    
    def _initialize_sensor(self):
        """ initialize AHT10 sensor """
        # send initialized command
        self._write_command([0xE1, 0x08, 0x00])
        time.sleep(0.01)  # wait for initialization complate
    
    def _write_command(self, cmd_list):
        """ write command """
        try:
            self.fd.write(bytes(cmd_list))
        except Exception as e:
            print(f"write failed: {str(e)}")
            return False
        return True
    
    def _read_data(self, length):
        """ read data from sensor """
        try:
            return self.fd.read(length)
        except Exception as e:
            print(f"read failed: {str(e)}")
            return None
    
    def trigger_measurement(self):
        """ trigger measure """
        return self._write_command([0xAC, 0x33, 0x00])
    
    def read_status(self):
        """ read register """
        status = self._read_data(1)
        return status[0] if status else None
    
    def read_raw_data(self):
        """ Read test data """
        # trigger test
        if not self.trigger_measurement():
            return None
        
        # wait for 80ms
        timeout = 80  # ms
        while timeout > 0:
            status = self.read_status()
            if status is None:
                return None
            if not (status & 0x80):  # check flag
                break
            time.sleep(0.001)
            timeout -= 1
        
        if timeout <= 0:
            print("timeout")
            return None
        
        # read 6-bit data
        raw_data = self._read_data(6)
        if not raw_data or len(raw_data) != 6:
            return None
        
        return raw_data
    
    def get_temperature_humidity(self):
        """ Get data """
        raw_data = self.read_raw_data()
        if not raw_data:
            return (None, None)
        
        # origin data
        hum_raw = ((raw_data[1] << 12) | (raw_data[2] << 4) | (raw_data[3] >> 4))
        temp_raw = (((raw_data[3] & 0x0F) << 16) | (raw_data[4] << 8) | raw_data[5])
        
        # calculate
        humidity = (hum_raw / 0x100000) * 100
        temperature = (temp_raw / 0x100000) * 200 - 50
        
        # humidity range
        humidity = max(0.0, min(100.0, humidity))
        temperature = max(-40.0, min(85.0, temperature))
        
        return (temperature, humidity)

def main():
    sensor = AHT10()
    
    if not sensor.open():
        return 1
    
    try:
        while True:
            temp, hum = sensor.get_temperature_humidity()
            if temp is not None and hum is not None:
                print(f"Temperature: {temp:.2f} C, Humidity: {hum:.2f}%")
            else:
                print("Faild to get data")
            
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\nExit project")
    
    finally:
        sensor.close()
    
    return 0

if __name__ == "__main__":
    sys.exit(main())

保存代码,终端执行指令 python3 I2C_Sensor_AHT10.py 运行采集数据并打印的程序。

效果

终端输出采集的温湿度数据

Home Assistant

在实现传感器驱动及串口打印的基础上,进一步实现 Home Assistant (HA) 智能家居平台的连接,实现远程温湿度计。

方案

实现数据上传至 Home Assistant 有多种方案,包括但不限于

  • 使用 Home Assistant RESTful Sensor;
  • 使用 Node-RED 作为中间件;
  • 通过 Telegraf 和 InfluxDB;
  • 直接写入 SQL 数据库;
  • 使用 Syslog 集成;
  • Socket 通过 EMQX MQTT 代理实现数据转发;

结合开发板资源以及官方软件库的支持范围,选择 Socket 和 EMQX 代理的方案。

流程图

代码

新建 aht10_emqx_auth 文件,添加如下代码

# aht10_emqx_auth.py
import socket
import json
import time
import struct
from I2C_Sensor_AHT10 import AHT10

EMQX_HOST = "192.168.1.121"  # EMQX Server IP
EMQX_PORT = 1883             # MQTT default port
MQTT_USER = "LJL"            # EMQX username
MQTT_PASS = "4421989g"       # EMQX passward
CLIENT_ID = "vf2-aht10"      # Client ID
TOPIC = "home/sensor/aht10"  # MQTT topic
INTERVAL = 2                 # delay

def connect_mqtt(sock):
    """MQTT"""
    # MQTT protocol
    protocol_name = b"MQTT"
    protocol_level = 4  # MQTT 3.1.1
    
    # top
    connect_flags = 0xC2  # 
    keep_alive = 60  # 
    
    # load
    client_id_bytes = CLIENT_ID.encode()
    username_bytes = MQTT_USER.encode()
    password_bytes = MQTT_PASS.encode()
    
    # calculate length
    remaining_length = (
        2 + len(protocol_name) + 1 + 1 + 2 +  # 
        2 + len(client_id_bytes) +             # 
        2 + len(username_bytes) +             # 
        2 + len(password_bytes)               # 
    )
    # build packet
    packet = bytearray()
    packet.append(0x10)  # 
    
    # code
    while remaining_length > 0:
        byte = remaining_length % 128
        remaining_length = remaining_length // 128
        if remaining_length > 0:
            byte = byte | 0x80
        packet.append(byte)
    
    # add protocol name
    packet.extend(struct.pack("!H", len(protocol_name)))
    packet.extend(protocol_name)
    
    # add flag
    packet.extend([protocol_level, connect_flags])
    packet.extend(struct.pack("!H", keep_alive))
    
    # add client id
    packet.extend(struct.pack("!H", len(client_id_bytes)))
    packet.extend(client_id_bytes)
    
    # add user name
    packet.extend(struct.pack("!H", len(username_bytes)))
    packet.extend(username_bytes)
    
    # add password
    packet.extend(struct.pack("!H", len(password_bytes)))
    packet.extend(password_bytes)
    
    sock.sendall(packet)
    resp = sock.recv(4)
    if len(resp) < 4 or resp[0] != 0x20 or resp[3] != 0x00:
        raise ConnectionError("MQTT authentication failed")

def publish_data(sock, temp, hum):
    """ publish MQTT data """
    payload = json.dumps({
        "temperature": round(temp, 1),
        "humidity": round(hum, 1)
    }).encode()
    
    # PUBLISH words
    packet = bytearray()
    packet.append(0x30)  # PUBLISH QoS 0
    
    # calculate remain length
    remaining_length = 2 + len(TOPIC) + len(payload)
    
    # just
    temp_len = remaining_length
    while temp_len > 0:
        byte = temp_len % 128
        temp_len = temp_len // 128
        if temp_len > 0:
            byte = byte | 0x80
        packet.append(byte)
    
    # message topic
    packet.extend(struct.pack("!H", len(TOPIC)))
    packet.extend(TOPIC.encode())
    
    # message boday
    packet.extend(payload)
    
    sock.sendall(packet)

def main():
    sensor = AHT10()
    if not sensor.open():
        return

    while True:
        try:
            with socket.socket() as s:
                s.connect((EMQX_HOST, EMQX_PORT))
                connect_mqtt(s)  # MQTT connect
                print("Connected & authenticated to EMQX")
                
                while True:
                    temp, hum = sensor.get_temperature_humidity()
                    if None not in (temp, hum):
                        publish_data(s, temp, hum)
                        print(f"Published: {temp} C, {hum}%")
                    time.sleep(INTERVAL)
                    
        except (ConnectionError, OSError) as e:
            print(f"Connection error: {e}, retrying in 5s...")
            time.sleep(5)
        except KeyboardInterrupt:
            break
        finally:
            sensor.close()

if __name__ == "__main__":
    main()

保存代码,终端执行指令 aht10_emqx_auth.py ,程序运行。

测试

使用 MQTTX 软件作为 MQTT Broker, 测试 EMQX 服务器消息接收情况

  • 新建连接,输入 EMQX 建立的客户端节点的 IP 地址,用户名和密码,连接至 EMQX 平台;
  • 在此 MQTT Broker 新建对话主题 home/sensor/aht10 ,即可接收 EMQX 客户端发送的消息


网页端

将发送的 AHT10 数据添加至 Home Assistant 平台,实现网页端的 Home Assistant 传感器数据显示。

修改 configuration.yaml 配置文件

mqtt:
  sensor:
    - name: "AHT10/Temperature"
      state_topic: "home/sensor/aht10"
      suggested_display_precision: 1
      unit_of_measurement: "C"
      value_template: "{{ value_json.temperature }}"
    - name: "AHT10/Humidity"
      state_topic: "home/sensor/aht10"
      suggested_display_precision: 1
      unit_of_measurement: "%"
      value_template: "{{ value_json.humidity }}"

应用配置并刷新网页,可在传感器标签页下找到 AHT10 传感器数据显示

分别点击 Humidity 和 Temperature 查看历史数据。

湿度历史曲线

温度历史曲线

APP

依托 HA 丰富的生态建设,在移动端可通过 Home Assistant 应用实现传感器数据的实时监控。

传感器连接开发板的实物图,显示了智能手机远程数据采集和环境温湿度监控


总结

本文介绍了昉·星光2 (StarFive2)单板计算机通过 IIC 协议驱动 SHT40 和 AHT10 温湿度传感器,进一步连接 Home Assistant 实现远程数据采集和环境监控的项目设计,为该开发板在物联网领域的应用提供了参考。


全部评论
暂无评论
0/144