AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


构建基于 Raspberry Pi 树莓派的智能 AI 家居安全系统

发布日期:2025-02-16 07:32:01 浏览次数: 1662 来源:红薯 AI 胡说
推荐语

利用树莓派打造智能AI家居安全系统,实时识别家庭成员,检测陌生人并识别包裹递送。

核心内容:
1. 树莓派5与AI套件的硬件要求
2. 软件设置与环境配置步骤
3. 编写智能安全系统代码实现

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

在这篇文章中,我们将使用 Raspberry Pi 和 AI 功能创建一个复杂的家庭安全系统。我们的系统将识别家庭成员、检测陌生人并识别包裹递送,同时发送实时网络通知。

硬件要求

  1. Raspberry Pi 5(这是我默认使用的那个)
  2. Hailo 8L Raspberry Pi AI 套件(在此处提供)
  3. NexiGo 网络摄像头(功能物超所值,请在此处查看)
  4. 互联网连接(幸运的是,Raspberry Pi 5 带有集成的 Wi-Fi 连接并具有以太网端口)

软件设置

首先,让我们使用必要的软件设置我们的 Raspberry Pi。我将跳过作系统安装,因为 CanaKit Raspberry Pi 5 套件附带了预装了系统的 SD 卡:

sudo apt update && sudo apt full-upgrade
sudo apt install hailo-all
git clone https://github.com/hailo-ai/hailo-rpi5-examples.git
cd hailo-rpi5-examples
source setup_env.sh
./compile_postprocess.sh
pip3 install opencv-python-headless numpy supervision pushbullet.py face_recognition

代码

现在,让我们创建 smart_security_system.py 文件:

import cv2
import numpy as np
import supervision as sv
import face_recognition
import time
from hailo_rpi_common import GStreamerApp, app_callback_class
from pushbullet import Pushbullet
from hailo_models import YoloV5PostProcessing
# Initialize Pushbullet for notifications
pb = Pushbullet("YOUR_API_KEY")
# Load known faces
known_face_encodings = []
known_face_names = []

defload_known_faces(directory):
    for filename in os.listdir(directory):
        if filename.endswith(".jpg"or filename.endswith(".png"):
            image = face_recognition.load_image_file(os.path.join(directory, filename))
            encoding = face_recognition.face_encodings(image)[0]
            known_face_encodings.append(encoding)
            known_face_names.append(os.path.splitext(filename)[0])

load_known_faces("known_faces")

# Initialize YOLOv5 object detection
yolo_postprocess = YoloV5PostProcessing()
@app_callback_class
classSmartSecurityCallback:
    def__init__(self):
        self.last_notification_time = 0
        self.face_locations = []
        self.face_names = []
        self.process_this_frame = True
    
    defapp_callback(self, buffer, caps):
        frame = self.get_numpy_from_buffer(buffer, caps)
        
        # Resize frame for faster face recognition processing
        small_frame = cv2.resize(frame, (00), fx=0.25, fy=0.25)
        rgb_small_frame = small_frame[:, :, ::-1]
        
        if self.process_this_frame:
            # Find all faces in the current frame
            self.face_locations = face_recognition.face_locations(rgb_small_frame)
            face_encodings = face_recognition.face_encodings(rgb_small_frame, self.face_locations)
            self.face_names = []
            for face_encoding in face_encodings:
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                name = "Unknown"
                face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
                best_match_index = np.argmin(face_distances)
                if matches[best_match_index]:
                    name = known_face_names[best_match_index]
                self.face_names.append(name)
        
        self.process_this_frame = not self.process_this_frame
        # Detect objects (people and packages)
        detections = yolo_postprocess.postprocess(frame)
        
        for detection in detections:
            if detection.class_id == 0:  # Person
                if"Unknown"in self.face_names:
                    self.send_notification("Stranger detected")
                else:
                    self.send_notification(f"Family member detected: {', '.join(set(self.face_names))}")
            
            elif detection.class_id == 39:  # Package (assuming class ID 39 for package in COCO dataset)
                self.send_notification("Package delivery detected")
    
    defsend_notification(self, message):
        current_time = time.time()
        if current_time - self.last_notification_time > 60:  # Limit to one notification per minute
            push = pb.push_note("Smart Security Alert", message)
            self.last_notification_time = current_time
    
    defget_numpy_from_buffer(self, buffer, caps):
        # Convert GStreamer buffer to numpy array
        # Implementation depends on the specific GStreamer setup
        pass

defmain():
    app = GStreamerApp("Smart Security System", SmartSecurityCallback())
    app.run()
if __name__ == "__main__":
    main()

很多人问我如何将 GStreamer 的 cap 缓冲区转换为 NumPy 数组,因此在这里我向大家分享我的解决方案,特别是在 cap 是视频的情况下:

import numpy as np
import gi
gi.require_version('Gst''1.0')
from gi.repository import Gst

defget_numpy_from_buffer(self, buffer, caps):
    """
    Convert GStreamer buffer to numpy array
    
    :param buffer: Gst.Buffer
    :param caps: Gst.Caps
    :return: numpy.ndarray
    """

    # Get the Gst.Structure from the caps
    structure = caps.get_structure(0)
    
    # Get the width and height of the video frame
    width = structure.get_value("width")
    height = structure.get_value("height")
    
    # Get the pixel format (assuming it's in caps)
    format_info = structure.get_value("format")
    
    # Map the buffer to memory
    success, map_info = buffer.map(Gst.MapFlags.READ)
    ifnot success:
        raise ValueError("Could not map buffer")
    
    try:
        # Get the data from the mapped buffer
        data = map_info.data
        
        # Determine the data type and shape based on the pixel format
        if format_info == "RGB":
            dtype = np.uint8
            shape = (height, width, 3)
        elif format_info == "RGBA":
            dtype = np.uint8
            shape = (height, width, 4)
        elif format_info == "GRAY8":
            dtype = np.uint8
            shape = (height, width)
        elif format_info == "GRAY16_LE":
            dtype = np.uint16
            shape = (height, width)
        else:
            raise ValueError(f"Unsupported format: {format_info}")
        
        # Create numpy array from the buffer data
        array = np.ndarray(shape=shape, dtype=dtype, buffer=data)
        
        # Make a copy of the array to ensure it's not tied to the original buffer
        return np.array(array)
    
    finally:
        # Unmap the buffer
        buffer.unmap(map_info)

此实现执行以下作:

  1. 它从 GStreamer Cap 中提取宽度、高度和像素格式。
  2. 它将缓冲区映射到内存以访问其数据。
  3. 根据像素格式,它确定适当的 numpy 数据类型和形状。
  4. 它从缓冲区数据创建一个 numpy 数组。
  5. 最后,它返回数组的副本,以确保它不绑定到原始缓冲区。

请注意,此实现采用某些像素格式(RGB、RGBA、GRAY8 GRAY16_LE)。您可能需要添加更多格式处理,具体取决于您的特定用例。此外,请确保您已安装必要的 GStreamer 和 numpy 依赖项:

pip install numpy PyGObject

您可能还需要在系统上安装 GStreamer 开发库。在 Ubuntu 或 Debian 上,您可以通过以下方式执行此作:

sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev

如何运作

  1. 人脸识别:系统使用 face_recognition 库来识别已知人脸。您需要使用家庭成员的图像填充 known_faces 目录。
  2. 对象检测:我们使用 Hailo 8L 加速的 YOLOv5 来检测视频流中的人员和包裹。
  3. 通知: 当系统检测到陌生人、家庭成员或包裹递送时,系统会通过 Pushbullet 发送 Web 通知。
  4. 性能:通过利用 Hailo 8L AI 加速器,我们实现了对视频源的实时处理,确保了快速响应时间。

设置系统

  1. 将 “YOUR_API_KEY” 替换为您的实际 Pushbullet API 密钥。
  2. 将家庭成员的照片添加到known_faces目录,并使用人员的姓名(例如,john.jpg)命名每个文件。
  3. 如果需要,根据您在 Hailo 8L 上使用的特定模型,调整 YoloV5PostProcessing 参数。

结论

这个智能家居安全系统展示了将 Raspberry Pi 与 AI 功能相结合的强大功能。Hailo 8L Raspberry Pi AI 套件提供了实时运行复杂 AI 模型所需的处理能力,而 NexiGo 网络摄像头则确保了高质量的视频输入。

通过构建此系统,您不仅可以增强家庭安全性,还可以获得 AI 和计算机视觉方面的宝贵经验。扩展的可能性是无穷无尽的——您可以添加入侵者警报、宠物检测或与智能家居设备集成等功能。

请记住,出色的 DIY 项目的关键是选择正确的组件。Hailo 8L 套件和 NexiGo 相机在性能和价值之间实现了出色的平衡,使其成为该项目的理想选择。


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询