🚀
PX4 - MAVSDK Python 프로그래밍
  • MAVSDK 개요
  • 프로그래밍 전제 조건
  • MAVSDK Python 개발 환경 설정
  • MAVSDK 라이브러리 클래스
  • Python "asyncio" 이해
  • MAVSDK-Python 샘플 프로그램 분석
  • OFFBOARD 모드 이해하기
  • OFFBOARD 위치 제어 (NED 기준)
  • OFFBOARD 속도 제어 (BODY 기준)
  • OFFBOARD 속도 제어 (NED 기준)
  • OFFBOARD 위치, 속도 제어 (NED 기준)
  • Keyboard 입력을 이용한 기체 이동
  • 문제 해결
  • MAVSDK Server
  • 교육 안내
Powered by GitBook
On this page
  • 1. MAVSDK-Python은 asyncio 기반 라이브러리입니다
  • 2. ayncio 핵심 문법 구조
  • ✅ 1. async def 함수 정의
  • ✅ 2. await 키워드
  • ✅ 3. asyncio.run() — 프로그램 진입점
  • ✅ 4. asyncio.create_task() — 병렬 작업 실행
  • ✅ 5. async for — 비동기 반복 (스트림 처리)
  • ✅ 6. async with — 비동기 컨텍스트 매니저
  • ✅ 7. asyncio.gather() — 여러 비동기 함수 한꺼번에 실행
  • ✍️ 예제 전체 구조 요약
  • ✅ 요약 테이블
  • 3. 실전 예제 1
  • 4. 실전 예제 2
  • ✅ 전체 구조 요약
  • 🚀 예제 코드: battery, altitude, flight mode 모니터링
  • ✅ 출력 예시 (SITL)
  • 🛠️ 커스터마이징 팁

Python "asyncio" 이해

저작권: 쿼드(QUAD) 드론연구소 https://smartstore.naver.com/maponarooo / Updated: 2025-05-04

1. MAVSDK-Python은 asyncio 기반 라이브러리입니다

즉, 모든 주요 함수는 async def 함수이며, await 키워드로 호출해야 합니다.

✳️ 왜 비동기(asyncio)인가?

드론과 MAVLink 통신은 다음과 같은 특성이 있기 때문입니다:

특성
설명

실시간 통신

센서값, 위치, 상태 등 지속적으로 송수신됨

이벤트 기반

비행 상태 변화, 명령 수신 등 이벤트 발생

I/O 지연 존재

네트워크, UDP, 시리얼 통신 기반으로 딜레이 있음

→ 이러한 상황을 asyncio로 비동기적으로 처리하면 효율적이고 끊김 없는 드론 제어가 가능합니다.

asyncio는 Python에서 비동기(Non-blocking) 프로그래밍을 위한 표준 라이브러리입니다. 드론 통신, 네트워크 요청, 센서 데이터 스트림 등에서 효율적인 I/O 처리에 매우 유용하죠.


2. ayncio 핵심 문법 구조

아래에 asyncio의 핵심 문법 구조를 빠르게 정리해드립니다:

✅ 1. async def 함수 정의

async def my_task():
    print("작업 시작")
    await asyncio.sleep(1)
    print("작업 완료")
  • async def 로 선언된 함수는 비동기 함수

  • await 키워드를 사용하여 다른 async 함수 호출


✅ 2. await 키워드

await asyncio.sleep(1)
  • await은 비동기 함수 호출 시 필수

  • CPU를 블로킹하지 않고 다른 작업을 실행할 기회를 줌


✅ 3. asyncio.run() — 프로그램 진입점

async def main():
    await my_task()

asyncio.run(main())
  • Python 3.7+에서 asyncio.run() 으로 최상위 루프 실행

  • 내부에서 이벤트 루프 생성 및 종료 자동 처리


✅ 4. asyncio.create_task() — 병렬 작업 실행

async def main():
    task1 = asyncio.create_task(my_task())
    task2 = asyncio.create_task(my_task())
    await task1
    await task2
  • 여러 작업을 동시에 실행하고 싶을 때 사용

  • 비동기 함수들의 병렬 실행


✅ 5. async for — 비동기 반복 (스트림 처리)

async def monitor():
    async for data in some_async_generator():
        print(data)
  • 드론 Telemetry, 센서 데이터 등 실시간 스트림 처리에 매우 유용


✅ 6. async with — 비동기 컨텍스트 매니저

async with aiohttp.ClientSession() as session:
    ...
  • 파일, 네트워크, 락 등 리소스를 비동기적으로 열고 닫을 때 사용


✅ 7. asyncio.gather() — 여러 비동기 함수 한꺼번에 실행

results = await asyncio.gather(
    my_task1(),
    my_task2()
)
  • 여러 작업의 결과를 한꺼번에 기다릴 때 사용


✍️ 예제 전체 구조 요약

import asyncio

async def task1():
    await asyncio.sleep(1)
    print("task1 done")

async def task2():
    await asyncio.sleep(2)
    print("task2 done")

async def main():
    await asyncio.gather(task1(), task2())

asyncio.run(main())

✅ 요약 테이블

문법 요소
설명

async def

비동기 함수 정의

await

비동기 함수 호출 시 필요

asyncio.run()

최상위 이벤트 루프 실행

asyncio.create_task()

태스크로 병렬 실행

async for

실시간 데이터 스트림 반복

asyncio.gather()

여러 비동기 함수 병렬 실행

async with

리소스를 비동기로 사용


3. 실전 예제 1

asyncio를 이용한 기본 예제 [Arming - Takeoff - Landing]

🔹 기본 구조

import asyncio
from mavsdk import System

async def run():
    drone = System()
    await drone.connect(system_address="udp://:14540")

    print("연결 대기 중...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print("드론 연결됨!")
            break

    print("Arming...")
    await drone.action.arm()

    print("Takeoff...")
    await drone.action.takeoff()
    await asyncio.sleep(10)

    print("Landing...")
    await drone.action.land()

asyncio.run(run())

async for의 활용: Telemetry 구독

async for position in drone.telemetry.position():
    print(f"위치: {position.latitude_deg}, {position.longitude_deg}")

이 구문은 드론에서 지속적으로 위치 정보를 받아서 처리합니다.


asyncio.sleep() 사용 이유

  • ROS의 rate.sleep() 같은 개념

  • time.sleep()은 블로킹(blocking)이라 사용하면 안 됨

await asyncio.sleep(1)  # 1초 동안 다른 async 작업 가능

병렬 Task 처리 (멀티 async)

task1 = asyncio.create_task(print_position(drone))
task2 = asyncio.create_task(monitor_battery(drone))

await task1
await task2

여러 개의 async def 함수로 나눠서 동시에 실행 가능


에러 핸들링 예시

try:
    await drone.action.takeoff()
except Exception as e:
    print(f"Takeoff 실패: {e}")

4. 실전 예제 2

아래는 asyncio를 활용하여 MAVSDK-Python에서 동시에 ✅ 배터리 상태, ✅ 고도, ✅ 비행 상태(Armed / Disarmed 등) 를 비동기적으로 추적하는 예제입니다.


✅ 전체 구조 요약

  • asyncio.create_task()로 각 telemetry 구독을 개별 태스크로 실행

  • async for로 각각의 스트림을 실시간 처리

  • asyncio.run()으로 비동기 메인 함수 실행


🚀 예제 코드: battery, altitude, flight mode 모니터링

import asyncio
from mavsdk import System


async def monitor_battery(drone):
    async for battery in drone.telemetry.battery():
        print(f"[🔋 Battery] {battery.remaining_percent:.1f}%")

async def monitor_altitude(drone):
    async for position in drone.telemetry.position():
        print(f"[📡 Altitude] {position.relative_altitude_m:.2f} m")

async def monitor_flight_mode(drone):
    async for flight_mode in drone.telemetry.flight_mode():
        print(f"[✈️ Flight Mode] {flight_mode}")

async def run():
    drone = System()
    await drone.connect(system_address="udp://:14540")  # SITL 기본 포트

    print("드론 연결 대기 중...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print("✅ 드론 연결됨!")
            break

    # 동시에 3가지 모니터링 시작
    battery_task = asyncio.create_task(monitor_battery(drone))
    altitude_task = asyncio.create_task(monitor_altitude(drone))
    mode_task = asyncio.create_task(monitor_flight_mode(drone))

    # 30초간 모니터링 후 종료
    await asyncio.sleep(30)

    # 종료 메시지 출력 후 태스크 취소
    battery_task.cancel()
    altitude_task.cancel()
    mode_task.cancel()
    print("⏹️ 모니터링 종료")

asyncio.run(run())

✅ 출력 예시 (SITL)

css복사편집[🔋 Battery] 99.3%
[📡 Altitude] 0.02 m
[✈️ Flight Mode] HOLD
[🔋 Battery] 99.2%
[📡 Altitude] 0.04 m
[✈️ Flight Mode] HOLD
...

🛠️ 커스터마이징 팁

기능
방법

모니터링 시간 늘리기

await asyncio.sleep(30) 값 조절

로그 파일 저장

open('log.txt', 'w') 파일 핸들링 추가

다른 telemetry 항목 추가

drone.telemetry.armed(), drone.telemetry.gps_info() 등

PreviousMAVSDK 라이브러리 클래스NextMAVSDK-Python 샘플 프로그램 분석

Last updated 23 days ago