Python "asyncio" 이해
저작권: 쿼드(QUAD) 드론연구소 https://smartstore.naver.com/maponarooo / Updated: 2025-05-04
1. MAVSDK-Python은 asyncio
기반 라이브러리입니다
asyncio
기반 라이브러리입니다즉, 모든 주요 함수는 async def
함수이며, await
키워드로 호출해야 합니다.
✳️ 왜 비동기(asyncio)인가?
드론과 MAVLink 통신은 다음과 같은 특성이 있기 때문입니다:
실시간 통신
센서값, 위치, 상태 등 지속적으로 송수신됨
이벤트 기반
비행 상태 변화, 명령 수신 등 이벤트 발생
I/O 지연 존재
네트워크, UDP, 시리얼 통신 기반으로 딜레이 있음
→ 이러한 상황을 asyncio
로 비동기적으로 처리하면 효율적이고 끊김 없는 드론 제어가 가능합니다.
asyncio
는 Python에서 비동기(Non-blocking) 프로그래밍을 위한 표준 라이브러리입니다.
드론 통신, 네트워크 요청, 센서 데이터 스트림 등에서 효율적인 I/O 처리에 매우 유용하죠.
2. ayncio 핵심 문법 구조
아래에 asyncio
의 핵심 문법 구조를 빠르게 정리해드립니다:
✅ 1. async def
함수 정의
async def
함수 정의async def my_task():
print("작업 시작")
await asyncio.sleep(1)
print("작업 완료")
async def
로 선언된 함수는 비동기 함수await
키워드를 사용하여 다른 async 함수 호출
✅ 2. await
키워드
await
키워드await asyncio.sleep(1)
await
은 비동기 함수 호출 시 필수CPU를 블로킹하지 않고 다른 작업을 실행할 기회를 줌
✅ 3. asyncio.run()
— 프로그램 진입점
asyncio.run()
— 프로그램 진입점async def main():
await my_task()
asyncio.run(main())
Python 3.7+에서
asyncio.run()
으로 최상위 루프 실행내부에서 이벤트 루프 생성 및 종료 자동 처리
✅ 4. asyncio.create_task()
— 병렬 작업 실행
asyncio.create_task()
— 병렬 작업 실행async def main():
task1 = asyncio.create_task(my_task())
task2 = asyncio.create_task(my_task())
await task1
await task2
여러 작업을 동시에 실행하고 싶을 때 사용
비동기 함수들의 병렬 실행
✅ 5. async for
— 비동기 반복 (스트림 처리)
async for
— 비동기 반복 (스트림 처리)async def monitor():
async for data in some_async_generator():
print(data)
드론 Telemetry, 센서 데이터 등 실시간 스트림 처리에 매우 유용
✅ 6. async with
— 비동기 컨텍스트 매니저
async with
— 비동기 컨텍스트 매니저async with aiohttp.ClientSession() as session:
...
파일, 네트워크, 락 등 리소스를 비동기적으로 열고 닫을 때 사용
✅ 7. asyncio.gather()
— 여러 비동기 함수 한꺼번에 실행
asyncio.gather()
— 여러 비동기 함수 한꺼번에 실행results = await asyncio.gather(
my_task1(),
my_task2()
)
여러 작업의 결과를 한꺼번에 기다릴 때 사용
✍️ 예제 전체 구조 요약
import asyncio
async def task1():
await asyncio.sleep(1)
print("task1 done")
async def task2():
await asyncio.sleep(2)
print("task2 done")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
✅ 요약 테이블
async def
비동기 함수 정의
await
비동기 함수 호출 시 필요
asyncio.run()
최상위 이벤트 루프 실행
asyncio.create_task()
태스크로 병렬 실행
async for
실시간 데이터 스트림 반복
asyncio.gather()
여러 비동기 함수 병렬 실행
async with
리소스를 비동기로 사용
3. 실전 예제 1
asyncio를 이용한 기본 예제 [Arming - Takeoff - Landing]
🔹 기본 구조
import asyncio
from mavsdk import System
async def run():
drone = System()
await drone.connect(system_address="udp://:14540")
print("연결 대기 중...")
async for state in drone.core.connection_state():
if state.is_connected:
print("드론 연결됨!")
break
print("Arming...")
await drone.action.arm()
print("Takeoff...")
await drone.action.takeoff()
await asyncio.sleep(10)
print("Landing...")
await drone.action.land()
asyncio.run(run())
async for
의 활용: Telemetry 구독
async for
의 활용: Telemetry 구독async for position in drone.telemetry.position():
print(f"위치: {position.latitude_deg}, {position.longitude_deg}")
이 구문은 드론에서 지속적으로 위치 정보를 받아서 처리합니다.
asyncio.sleep()
사용 이유
asyncio.sleep()
사용 이유ROS의
rate.sleep()
같은 개념time.sleep()
은 블로킹(blocking)이라 사용하면 안 됨
await asyncio.sleep(1) # 1초 동안 다른 async 작업 가능
병렬 Task 처리 (멀티 async)
task1 = asyncio.create_task(print_position(drone))
task2 = asyncio.create_task(monitor_battery(drone))
await task1
await task2
여러 개의 async def
함수로 나눠서 동시에 실행 가능
에러 핸들링 예시
try:
await drone.action.takeoff()
except Exception as e:
print(f"Takeoff 실패: {e}")
4. 실전 예제 2
아래는 asyncio
를 활용하여 MAVSDK-Python에서 동시에
✅ 배터리 상태,
✅ 고도,
✅ 비행 상태(Armed / Disarmed 등)
를 비동기적으로 추적하는 예제입니다.
✅ 전체 구조 요약
asyncio.create_task()
로 각 telemetry 구독을 개별 태스크로 실행async for
로 각각의 스트림을 실시간 처리asyncio.run()
으로 비동기 메인 함수 실행
🚀 예제 코드: battery, altitude, flight mode 모니터링
import asyncio
from mavsdk import System
async def monitor_battery(drone):
async for battery in drone.telemetry.battery():
print(f"[🔋 Battery] {battery.remaining_percent:.1f}%")
async def monitor_altitude(drone):
async for position in drone.telemetry.position():
print(f"[📡 Altitude] {position.relative_altitude_m:.2f} m")
async def monitor_flight_mode(drone):
async for flight_mode in drone.telemetry.flight_mode():
print(f"[✈️ Flight Mode] {flight_mode}")
async def run():
drone = System()
await drone.connect(system_address="udp://:14540") # SITL 기본 포트
print("드론 연결 대기 중...")
async for state in drone.core.connection_state():
if state.is_connected:
print("✅ 드론 연결됨!")
break
# 동시에 3가지 모니터링 시작
battery_task = asyncio.create_task(monitor_battery(drone))
altitude_task = asyncio.create_task(monitor_altitude(drone))
mode_task = asyncio.create_task(monitor_flight_mode(drone))
# 30초간 모니터링 후 종료
await asyncio.sleep(30)
# 종료 메시지 출력 후 태스크 취소
battery_task.cancel()
altitude_task.cancel()
mode_task.cancel()
print("⏹️ 모니터링 종료")
asyncio.run(run())
✅ 출력 예시 (SITL)
css복사편집[🔋 Battery] 99.3%
[📡 Altitude] 0.02 m
[✈️ Flight Mode] HOLD
[🔋 Battery] 99.2%
[📡 Altitude] 0.04 m
[✈️ Flight Mode] HOLD
...
🛠️ 커스터마이징 팁
모니터링 시간 늘리기
await asyncio.sleep(30)
값 조절
로그 파일 저장
open('log.txt', 'w')
파일 핸들링 추가
다른 telemetry 항목 추가
drone.telemetry.armed()
, drone.telemetry.gps_info()
등
Last updated