26 主程序架构介绍

来自Waveshare Wiki
跳转至: 导航搜索

主程序架构介绍

文件结构和功能介绍

  • ugv_pt_rpi
    • [文件夹] AccessPopup (用于网络连接相关功能)
    • [文件夹] sounds (用于存储音频文件,可以在这里配置语音包)
    • [文件夹] static (用于存储拍摄的照片)
    • [文件夹] templates (WEB 应用的相关资源)
    • [文件夹] tutorial_cn (中文版交互式教程)
    • [文件夹] tutorial_en (英文版交互式教程)
    • [文件夹] videos (用于存储录制的视频)
    • app.py (产品主程序,包括 web-socket 和 flask 相关功能)
    • asound.conf (声卡配置文件)
    • audio_ctrl.py (音频功能相关的库)
    • autorun.sh (配置产品主程序和 jupyterlab 开机自动运行的脚本)
    • base_camera.py (flask 实时视频的底层多线程采集的库,原项目为 flask-video-streaming)
    • base_ctrl.py (与下位机通信的库,通过串口与下位机进行通信)
    • config.yaml (配置文件,用于配置一些参数)
    • requirements.txt (python项目依赖库)
    • robot_ctrl.py (机器人动作、视觉处理相关的库)
    • serial_simple_ctrl.py (独立的程序,用于测试串口通信)
    • setup.sh (自动安装脚本)
    • start_jupyter.sh (开启 jupyterlab 服务器)

安装脚本

在项目文件夹中有一个名为 setup.sh 的文件,使用 shell 编写,可以帮助自动配置机器人产品的上位机,包括设置串口、设置摄像头、创建项目虚拟环境和安装依赖库等。这些步骤在我们出厂的SD卡的镜像中都是已经配置好的了。

安装脚本使用方法,安装过程需要从网络下载并安装很多依赖库,对于网络环境比较特殊的地区我们推荐你使用直接从我们官网下载镜像文件的方法来安装产品。

自动运行程序

项目文件夹中的 autorun.sh 用于配置产品主程序(app.py)和 jupyterlab(start_jupyter.sh)的开机自动运行(以用户身份而非 root),同时生成 jupyterlab 的配置文件。

app.py 介绍(v0.89)

以下代码块仅用作展示,不可以运行

导入 Flask 应用和 json 相关的库,用于构建 WEB 应用:

from importlib import import_module
import os, socket, psutil
import subprocess, re, netifaces
from flask import Flask, render_template, Response, jsonify, request, send_from_directory, send_file
from werkzeug.utils import secure_filename
import json

导入 web-socket 相关的库:

from flask_socketio import SocketIO, emit

导入和机器人控制相关的库,包括视觉功能和动作控制等:

from robot_ctrl import Camera
from robot_ctrl import RobotCtrlMiddleWare

导入其它库:

import time    # 时间函数库
import logging    # 用于设置flask应用的输出信息
import threading    # 多线程函数库

import yaml    # 用于读取 .yaml 配置文件

打开 config.yaml 配置文件,获取配置文件中的参数:

curpath = os.path.realpath(__file__)
thisPath = os.path.dirname(curpath)
with open(thisPath + '/config.yaml', 'r') as yaml_file:
    f = yaml.safe_load(yaml_file)

robot_name  = f['base_config']['robot_name']
sbc_version = f['base_config']['sbc_version']

实例化 Flask 应用并配置输出(关闭输出):

app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.disabled = True

实例化 web-socket功能(用于网页客户端与服务器通信),视频相关功能(实时视频、Opencv),机器人动作控制(运动,灯光,云台,获取底盘反馈等):

socketio = SocketIO(app)
camera = Camera()
robot = RobotCtrlMiddleWare()

网络相关设置:

net_interface = "wlan0" # 设置无线网卡,板载是wlan0,usb的是其它数字
wifi_mode = "None"
# 存储WIFI模式,这个变量会显示在OLED屏幕上
eth0_ip = None # 网口的IP地址,会显示在OLED屏幕上
wlan_ip = None # 无线网络(net_interface)的IP地址,会显示在OLED屏幕上

网页拖拽上传音频文件的存储路径:

UPLOAD_FOLDER = thisPath + '/sounds/others'

用于存储上位机信息的一些变量(主程序运行时,这些变量会由其它函数更新):

pic_size = 0;
vid_size = 0;
cpu_read = 0;
cpu_temp = 0;
ram_read = 0;
rssi_read= 0;

可执行的指令与对应函数的字典,每个指令有一个对应的代号,存储在 config.yaml 文件中,这里使用字典来选择需要执行的指令,因为指令数量太多所以这里不能用一大堆 if else 来写,那样的话会严重影响可读性。

cmd_actions = {
    f['code']['min_res']: lambda: camera.set_video_resolution("240P"),
    f['code']['mid_res']: lambda: camera.set_video_resolution("480P"),
    f['code']['max_res']: lambda: camera.set_video_resolution("960P"),
    f['code']['zoom_x1']: lambda: camera.scale_frame(1),
    f['code']['zoom_x2']: lambda: camera.scale_frame(2),
    f['code']['zoom_x4']: lambda: camera.scale_frame(4),
    f['code']['pic_cap']: lambda: camera.capture_frame(thisPath + '/static/'),
    f['code']['vid_sta']: lambda: camera.record_video(1, thisPath + '/videos/'),
    f['code']['vid_end']: lambda: camera.record_video(0, thisPath + '/videos/'),
    f['code']['cv_none']: lambda: camera.set_cv_mode(f['code']['cv_none']),
    f['code']['cv_moti']: lambda: camera.set_cv_mode(f['code']['cv_moti']),
    f['code']['cv_face']: lambda: camera.set_cv_mode(f['code']['cv_face']),
    f['code']['cv_objs']: lambda: camera.set_cv_mode(f['code']['cv_objs']),
    f['code']['cv_clor']: lambda: camera.set_cv_mode(f['code']['cv_clor']),
    f['code']['cv_hand']: lambda: camera.set_cv_mode(f['code']['cv_hand']),
    f['code']['cv_auto']: lambda: camera.set_cv_mode(f['code']['cv_auto']),
    f['code']['mp_face']: lambda: camera.set_cv_mode(f['code']['mp_face']),
    f['code']['mp_pose']: lambda: camera.set_cv_mode(f['code']['mp_pose']),
    f['code']['re_none']: lambda: camera.set_detection_reaction(f['code']['re_none']),
    f['code']['re_capt']: lambda: camera.set_detection_reaction(f['code']['re_capt']),
    f['code']['re_reco']: lambda: camera.set_detection_reaction(f['code']['re_reco']),
    f['code']['mc_lock']: lambda: camera.set_movtion_lock(f['code']['mc_lock']),
    f['code']['mc_unlo']: lambda: camera.set_movtion_lock(f['code']['mc_unlo']),
    f['code']['led_off']: robot.set_led_mode_off,
    f['code']['led_aut']: robot.set_led_mode_auto,
    f['code']['led_ton']: robot.set_led_mode_on,
    f['code']['base_of']: robot.set_base_led_off,
    f['code']['base_on']: robot.set_base_led_on,
    f['code']['head_ct']: robot.head_led_ctrl,
    f['code']['base_ct']: robot.base_led_ctrl,
    f['code']['s_panid']: camera.set_pan_id,
    f['code']['release']: camera.release_torque,
    f['code']['set_mid']: camera.middle_set,
    f['code']['s_tilid']: camera.set_tilt_id
}

网页加载时也需要问服务器要 config.yaml文件,用于网页上面的一些信息的配置,例如显示产品名称或者与服务端统一上面指令锁对应的代号,网页通过“/config”路由来发送请求,服务器在这个路由中给网页客户端返回它所需要的 config.yaml 文件。

@app.route('/config')
def get_config():
    with open(thisPath + '/config.yaml', 'r') as file:
        yaml_content = file.read()
    return yaml_content

获取 WIFI 信号强度,参数为无线网卡的名称(一个设备上有可能会包含多个无线网卡)。

def get_signal_strength(interface):
    try:
        output = subprocess.check_output(["/sbin/iwconfig", interface]).decode("utf-8")
        signal_strength = re.search(r"Signal level=(-\d+)", output)
        if signal_strength:
            return int(signal_strength.group(1))
        return 0
    except FileNotFoundError:
        print("iwconfig command not found. Please ensure it's installed and in your PATH.")
        return -1
    except subprocess.CalledProcessError as e:
        print(f"Error executing iwconfig: {e}")
        return -1
    except Exception as e:
        print(f"An error occurred: {e}")
        return -1

获取 WIFI 模式,判断 WIFI 是 AP 还是 STA 模式。

def get_wifi_mode():
    global wifi_mode
    try:
        result = subprocess.check_output(['/sbin/iwconfig', 'wlan0'], encoding='utf-8')

        if "Mode:Master" in result or "Mode:AP" in result:
            wifi_mode = "AP"
            return "AP"

        if "Mode:Managed" in result:
            wifi_mode = "STA"
            return "STA"

    except subprocess.CalledProcessError as e:
        print(f"Error checking Wi-Fi mode: {e}")
        return None

    return None

获取 IP 地址,需要的参数为网卡名称:

def get_ip_address(interface):
    try:
        interface_info = netifaces.ifaddresses(interface)

        ipv4_info = interface_info.get(netifaces.AF_INET, [{}])
        return ipv4_info[0].get('addr')
    except ValueError:
        print(f"Interface {interface} not found.")
        return None
    except IndexError:
        print(f"No IPv4 address assigned to {interface}.")
        return None

获取 CPU 使用率,次函数会产生阻塞,阻塞时间为 cpu_percent() 的 interval 参数:

def get_cpu_usage():
    return psutil.cpu_percent(interval=2)

获取 CPU 的温度:

def get_cpu_temperature():
    try:
        temperature_str = os.popen('vcgencmd measure_temp').readline()
        temperature = float(temperature_str.replace("temp=", "").replace("'C\n", ""))
        return temperature
    except Exception as e:
        print("Error reading CPU temperature:", str(e))
        return None

获取运行内存使用率:

def get_memory_usage():
    return psutil.virtual_memory().percent

这个函数里面集成了上面那些函数,用于获取各种信息后给相应的变量复制,方便程序的其它部分调用那些变量:

def update_device_info():
    global pic_size, vid_size, cpu_read, ram_read, rssi_read, cpu_temp
    cpu_read = get_cpu_usage()
    cpu_temp = get_cpu_temperature()
    ram_read = get_memory_usage()
    rssi_read= get_signal_strength(net_interface)

用于循环获取各类反馈信息并合并,然后发送给 web 客户端,这行函数在主程序启动时(第一次客户端建立连接时)会由 threading 来生成一个单独的线程来执行,不会影响主程序的执行,这个函数会从 camera.get_status() 中获取底盘的信息和其它视觉功能相关的信息,反馈给 web 客户端一个合并好的 json,反馈信息路由为 “/ctrl”

这个函数的运行频率为10Hz,但是有些信息的获取频率并不是10Hz,这个频率是 camera.get_status() 中信息的获取频率,其余的例如文件夹大小以及CPU内存占用率这些的获取频率会低得多,因为那些信息占用的资源比较多。

def update_data_websocket():
    while 1:
        try:
            fb_json = camera.get_status()
        except:
            continue
        socket_data = {
                    f['fb']['picture_size']:pic_size,
                    f['fb']['video_size']:  vid_size,
                    f['fb']['cpu_load']:    cpu_read,
                    f['fb']['cpu_temp']:    cpu_temp,
                    f['fb']['ram_usage']:   ram_read,
                    f['fb']['wifi_rssi']:   rssi_read
                    }
        try:
            socket_data.update(fb_json)
            socketio.emit('update', socket_data, namespace='/ctrl')
        except:
            pass
        time.sleep(0.1)

后面所有 @app.route() 都是 Flask 应用的路由函数装饰器,路由用于区分客户端发来的请求的类型,不同类型的请求会使用不同的函数来处理。

这个路由为主路由,当有客户端连接时(当有人访问 IP:5000 页面时),会随机播放 sounds/connected 文件夹内的一个音频文件,并且服务器会返回 WEB 应用的主控界面,主控界面的 html 文件为 index.html。

@app.route('/')
def index():
    """Video streaming home page."""
    robot.play_random_audio("connected", False)
    return render_template('index.html')

用于给客户端发送各类文件的路由: css,js,照片,视频。

@app.route('/<path:filename>')
def serve_static(filename):
    return send_from_directory('templates', filename)


@app.route('/photo/<path:filename>')
def serve_static_photo(filename):
    return send_from_directory('templates', filename)


@app.route('/video/<path:filename>')
def serve_static_video(filename):
    return send_from_directory('templates', filename)

用于打开设置页面的路由:

@app.route('/settings/<path:filename>')
def serve_static_settings(filename):
    return send_from_directory('templates', filename)

用于从设置页面返回到主页的路由:

@app.route('/index')
def serve_static_home(filename):
    return redirect(url_for('index'))

用于获取实时视频画面的函数,来自于开源项目 falsk-video-streaming:

def gen(cameraInput):
    """Video streaming generator function."""
    yield b'--frame\r\n'
    while True:
        frame = cameraInput.get_frame()
        yield b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n--frame\r\n'

用于在网页上显示实时视频画面的路由:

@app.route('/video_feed')
def video_feed():
    """Video streaming route. Put this in the src attribute of an img tag."""
    return Response(gen(camera),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

用于获取照片文件夹内照片名称列表的路由:

@app.route('/get_photo_names')
def get_photo_names():
    photo_files = sorted(os.listdir(thisPath + '/static'), key=lambda x: os.path.getmtime(os.path.join(thisPath + '/static', x)), reverse=True)
    return jsonify(photo_files)

用于给网页发送图片的路由:

@app.route('/get_photo/<filename>')
def get_photo(filename):
    return send_from_directory(thisPath + '/static', filename)

用于删除图片的路由:

@app.route('/delete_photo', methods=['POST'])
def delete_photo():
    filename = request.form.get('filename')
    try:
        os.remove(os.path.join(thisPath + '/static', filename))
        return jsonify(success=True)
    except Exception as e:
        print(e)
        return jsonify(success=False)

下面几个函数是类似的功能,只是用来操作视频的:

@app.route('/delete_video', methods=['POST'])
def delete_video():
    filename = request.form.get('filename')
    try:
        os.remove(os.path.join(thisPath + '/videos', filename))
        return jsonify(success=True)
    except Exception as e:
        print(e)
        return jsonify(success=False)


@app.route('/get_video_names')
def get_video_names():
    video_files = sorted(
        [filename for filename in os.listdir(thisPath + '/videos/') if filename.endswith('.mp4')],
        key=lambda filename: os.path.getctime(os.path.join(thisPath + '/videos/', filename)),
        reverse=True
    )
    return jsonify(video_files)


@app.route('/videos/<path:filename>')
def videos(filename):
    return send_from_directory(thisPath + '/videos', filename)

使用遍历内部每个文件的方法来获取某个文件夹的大小,会产生比较大的资源占用

def get_folder_size(folder_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder_path):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            total_size += os.path.getsize(file_path)
    # Convert total_size to MB
    size_in_mb = total_size / (1024 * 1024)
    return round(size_in_mb,2)

web-socket 的路由,用于接收来自客户端的 json 指令,有些指令是高频的,并且需要尽量低的延迟,所以这里使用 web-socket 而不是使用 http,web-socket 是有连接的,一次连接多次通信;http 是无连接的,每次发送请求都需要先建立连接-通信-再销毁连接,http不适合高频低延迟的通信(http优点是简单)。

@socketio.on('json', namespace='/json')
def handle_socket_json(json):
    try:
        robot.json_command_handler(json)
    except Exception as e:
        print("Error handling JSON data:", e)
        return

更新 OLED 上面显示的信息的函数,这个函数在主程序运行时由 threading 生成一个独立的线程运行,上面有个以10Hz频率向网页反馈信息的函数,那里面有一些变量是由这个函数来更新的,这里的信息对实时性要求不高,所以这个函数里面循环获取信息的频率更低。(这个函数里面不是用 time.sleep() 来做延迟的,是用 update_device_info() 里面的 get_cpu_usage() 来实现延迟的)

def oled_update():
    global eth0_ip, wlan_ip
    robot.base_oled(0, f"E: No Ethernet")
    robot.base_oled(1, f"W: NO {net_interface}")
    robot.base_oled(2, "F/J:5000/8888")
    get_wifi_mode()
    start_time = time.time()
    last_folder_check_time = 0

    while True:
        current_time = time.time()

        if current_time - last_folder_check_time > 600:
            pic_size = get_folder_size(thisPath + '/static')
            vid_size = get_folder_size(thisPath + '/videos')
            last_folder_check_time = current_time
        
        update_device_info() # the interval of this loop is set in here
        get_wifi_mode()

        if get_ip_address('eth0') != eth0_ip:
            eth0_ip = get_ip_address('eth0');
            if eth0_ip:
                robot.base_oled(0, f"E:{eth0_ip}")
            else:
                robot.base_oled(0, f"E: No Ethernet")

        if get_ip_address(net_interface) != wlan_ip:
            wlan_ip = get_ip_address(net_interface)
            if wlan_ip:
                robot.base_oled(1, f"W:{wlan_ip}")
            else:
                robot.base_oled(1, f"W: NO {net_interface}")

        elapsed_time = current_time - start_time
        hours = int(elapsed_time // 3600)
        minutes = int((elapsed_time % 3600) // 60)
        seconds = int(elapsed_time % 60)
        robot.base_oled(3, f"{wifi_mode} {hours:02d}:{minutes:02d}:{seconds:02d} {rssi_read}dBm")

这个是用于处理客户端发来的命令行信息的路由:

@app.route('/send_command', methods=['POST'])
def handle_command():
    command = request.form['command']
    print("Received command:", command)
    # camera.info_update("CMD:" + command, (0,255,255), 0.36)
    camera.cmd_process(command)
    return jsonify({"status": "success", "message": "Command received"})

用于获取保存在 sounds/other 文件夹内音频文件列表的路由:

@app.route('/getAudioFiles', methods=['GET'])
def get_audio_files():
    files = [f for f in os.listdir(UPLOAD_FOLDER) if os.path.isfile(os.path.join(UPLOAD_FOLDER, f))]
    return jsonify(files)

用于实现网页端拖拽上传功能的路由,被上传的音频文件保存在 sounds/other 文件夹内:

@app.route('/uploadAudio', methods=['POST'])
def upload_audio():
    if 'file' not in request.files:
        return jsonify({'error': 'No file part'})
    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No selected file'})
    if file:
        filename = secure_filename(file.filename)
        file.save(os.path.join(UPLOAD_FOLDER, filename))
        return jsonify({'success': 'File uploaded successfully'})

用于播放 sounds/other 文件夹内音频的路由:

@app.route('/playAudio', methods=['POST'])
def play_audio():
    audio_file = request.form['audio_file']
    print(thisPath + '/sounds/others/' + audio_file)
    robot.audio_play(thisPath + '/sounds/others/' + audio_file)
    return jsonify({'success': 'Audio is playing'})

用于停止播放的路由:

@app.route('/stop_audio', methods=['POST'])
def audio_stop():
    robot.audio_stop()
    return jsonify({'success': 'Audio stop'})

这个函数用于开机自动执行一些命令行指令,在主程序启动时会自动执行该函数,可以随意增加这些指令的数量。

  • base -c {"T":142,"cmd":50} : 用于给底盘发送设置反馈时间间隔的指令,底盘默认循环中没有延迟,给50参数代表格外增加50ms的延迟,有助于提升上位机效率(上位机解码下位机的串口信息也算占用资源)
  • base -c {"T":131,"cmd":1} :开启底盘流反馈,这样反馈信息就不是一问一答的了,底盘可以自动连续发送反馈信息(虽然 ROS Driver 默认也是这个模式,但是还是写在这里了)
  • base -c {"T":143,"cmd":0} :关闭回音,这样发给底盘信息时,底盘不会反馈你发送的原信息(因为没必要,还会占用上位机的串口解码资源,尤其是用高频指令控制底盘时)
  • base -c {"T":4,"cmd":2} :用于设置底盘的外设类型,0 没有外设,1 机械臂, 2 云台
  • base -c {"T":300,"mode":0,"mac":"EF:EF:EF:EF:EF:EF"} :底盘不会被 ESP-NOW 广播信号控制,只能被 EF:EF:EF:EF:EF:EF 这个 MAC 地址发来的ESP-NOW指令控制,你可以随意更改这个 MAC 地址(因为有上位机的程序成功运行了所以就不需要被 ESP-NOW 广播控制了)
  • send -a -b :将广播地址添加到下位机的 ESP-NOW peer 中,方便后续使用设备间通信功能。
def cmd_on_boot():
    cmd_list = [
        'base -c {"T":142,"cmd":50}',   # set feedback interval
        'base -c {"T":131,"cmd":1}',    # serial feedback flow on
        'base -c {"T":143,"cmd":0}',    # serial echo off
        'base -c {"T":4,"cmd":2}',      # select the module - 0:None 1:RoArm-M2-S 2:Gimbal
        'base -c {"T":300,"mode":0,"mac":"EF:EF:EF:EF:EF:EF"}',  # the base won't be ctrl by esp-now broadcast cmd, but it can still recv broadcast megs.
        'send -a -b'    # add broadcast mac addr to peer
    ]
    for i in range(0, len(cmd_list)):
        camera.cmd_process(cmd_list[i])

主程序运行时会执行的内容:

if __name__ == '__main__':
    # 随机播放一个 sounds/robot_started 文件夹内音频文件
    robot.play_random_audio("robot_started", False)
    
    # LED 灯亮起
    robot.set_led_mode_on()
    
    # 为 update_data_websocket() 创建一个单独的线程
    date_update_thread = threading.Thread(target=update_data_websocket, daemon=True)
    date_update_thread.start()
    
    # 为 update_data_websocket() 创建一个单独的线程
    oled_update_thread = threading.Thread(target=oled_update, daemon=True)
    oled_update_thread.start()
    
    # 获取照片文件夹大小         
    pic_size = get_folder_size(thisPath + '/static')
    
    # 获取视频文件夹大小   
    vid_size = get_folder_size(thisPath + '/videos')
    
    # LED 灯关闭
    robot.set_led_mode_off()
    
    # 开机自动运行的命令行指令
    cmd_on_boot()
    
    # 开启 flask 应用服务器
    socketio.run(app, host='0.0.0.0', port=5000, allow_unsafe_werkzeug=True)