MCAP(Message Capture)
: mcap은 ros2bag(ROS1에선 rosbag)과 같이 ROS 메시지 데이터를 기록하고 재생하는 데 사용하는 라이브러리입니다.
MCAP vs ros2bag
rosbag은 ROS2의 기본적인 파일 포맷을 사용하는 반면, MCAP 파일 포맷은 데이터를 압축하여 저장할 수 있어 저장 공간을 절약할 수 있습니다. 따라서 대용량 데이터 처리에 적합합니다.
MCAP Install
기본적으로 ROS2가 설치되어있어야 합니다.
1. 플러그인 설치
sudo apt-get install ros-$ROS_DISTRO-rosbag2-storage-mcap
2. mcap 기록 시 스토리지ID 설정
ros2 bag record -s mcap --all
Documentation
https://mcap.dev/guides/getting-started/ros-2
기존 ros2bag (.db3파일)을 mcap 형식으로 변환
mcap convert multiple_files_1.db3 demo.mcap
mcap 읽기 및 쓰기 예제
MCAP은 rosbag2_py API 사용하여 녹화
읽기
1-1. import
import argparse
from rclpy.serialization import deserialize_message
from rosidl_runtime_py.utilities import get_message
from std_msgs.msg import String
import rosbag2_py
1-2. 함수 정의
def read_messages(input_bag: str):
reader = rosbag2_py.SequentialReader()
reader.open(
rosbag2_py.StorageOptions(uri=input_bag, storage_id="mcap"),
rosbag2_py.ConverterOptions(
input_serialization_format="cdr", output_serialization_format="cdr"
),
)
topic_types = reader.get_all_topics_and_types()
def typename(topic_name):
for topic_type in topic_types:
if topic_type.name == topic_name:
return topic_type.type
raise ValueError(f"topic {topic_name} not in bag")
while reader.has_next():
topic, data, timestamp = reader.read_next()
msg_type = get_message(typename(topic))
msg = deserialize_message(data, msg_type)
yield topic, msg, timestamp
del reader
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"input", help="input bag path (folder or filepath) to read from"
)
args = parser.parse_args()
for topic, msg, timestamp in read_messages(args.input):
if isinstance(msg, String):
print(f"{topic} [{timestamp}]: '{msg.data}'")
else:
print(f"{topic} [{timestamp}]: ({type(msg).__name__})")
if __name__ == "__main__":
main()
쓰기
2-1. import
from rclpy.serialization import serialize_message
from std_msgs.msg import String
import rosbag2_py
2-2. mcap 저장소 플러그인을 사용하여 새로운 녹화 기록
writer = rosbag2_py.SequentialWriter()
writer.open(
rosbag2_py.StorageOptions(uri="output.mcap", storage_id="mcap"),
rosbag2_py.ConverterOptions(
input_serialization_format="cdr", output_serialization_format="cdr"
),
)
2-3. rosbag 토픽 생성
writer.create_topic(
rosbag2_py.TopicMetadata(
name="/chatter", type="std_msgs/msg/String", serialization_format="cdr"
)
)
start_time = 0
for i in range(10):
msg = String()
msg.data = f"Chatter #{i}"
timestamp = start_time + (i * 100)
writer.write("/chatter", serialize_message(msg), timestamp)
2-4. 녹화를 닫기 위해 writer 삭제
del writer
반응형
'ROS2' 카테고리의 다른 글
[ROS2] ros2 bag 명령어 사용법 (1) | 2025.01.15 |
---|---|
[ROS2] Nav2란? (4) | 2024.09.09 |
[ROS2] Topics, Services, Actions 차이 (0) | 2024.06.11 |