ultralytics
sleap_io.io.ultralytics
¶
This module handles direct I/O operations for working with Ultralytics YOLO pose format.
Ultralytics YOLO pose format specification: - Directory structure: dataset_root/split/images/ and dataset_root/split/labels/ - Configuration: data.yaml file defining skeleton and dataset structure - Annotation format: Each .txt file contains lines with format: class_id x_center y_center width height x1 y1 v1 x2 y2 v2 ... xn yn vn - Coordinates: Normalized to [0,1] range, origin at top-left - Visibility: 0=not visible, 1=visible but occluded, 2=visible and not occluded
Functions:
Name | Description |
---|---|
create_data_yaml |
Create Ultralytics data.yaml configuration file. |
create_skeleton_from_config |
Create a Skeleton object from Ultralytics configuration. |
create_splits_from_labels |
Create dataset splits from Labels using the built-in splitting functionality. |
denormalize_coordinates |
Denormalize coordinates from [0,1] range to pixel coordinates. |
normalize_coordinates |
Normalize instance point coordinates to [0,1] range. |
parse_data_yaml |
Parse Ultralytics data.yaml configuration file. |
parse_label_file |
Parse a single Ultralytics label file and return instances. |
read_labels |
Read Ultralytics YOLO pose dataset and return a |
write_label_file |
Write a single Ultralytics label file for a frame. |
write_labels |
Write Labels to Ultralytics YOLO pose format. |
create_data_yaml(yaml_path, skeleton, split_ratios)
¶
Create Ultralytics data.yaml configuration file.
Source code in sleap_io/io/ultralytics.py
def create_data_yaml(
yaml_path: Path, skeleton: Skeleton, split_ratios: Dict[str, float]
) -> None:
"""Create Ultralytics data.yaml configuration file."""
# Build skeleton connections
skeleton_connections = []
for edge in skeleton.edges:
src_idx = skeleton.nodes.index(edge.source)
dst_idx = skeleton.nodes.index(edge.destination)
skeleton_connections.append([src_idx, dst_idx])
# Create flip indices (identity mapping by default)
flip_idx = list(range(len(skeleton.nodes)))
config = {
"path": ".",
"names": {0: "animal"},
"kpt_shape": [len(skeleton.nodes), 3],
"flip_idx": flip_idx,
"skeleton": skeleton_connections,
"node_names": [node.name for node in skeleton.nodes],
}
# Add split paths
for split_name in split_ratios.keys():
config[split_name] = f"{split_name}/images"
with open(yaml_path, "w") as f:
yaml.dump(config, f, default_flow_style=False)
create_skeleton_from_config(config)
¶
Create a Skeleton object from Ultralytics configuration.
Source code in sleap_io/io/ultralytics.py
def create_skeleton_from_config(config: Dict) -> Skeleton:
"""Create a Skeleton object from Ultralytics configuration."""
kpt_shape = config.get("kpt_shape", [1, 3])
num_keypoints = kpt_shape[0]
# Create nodes - use generic names if not specified
node_names = config.get("node_names", [f"point_{i}" for i in range(num_keypoints)])
nodes = [Node(name) for name in node_names[:num_keypoints]]
# Create edges from skeleton connections
edges = []
skeleton_connections = config.get("skeleton", [])
for connection in skeleton_connections:
if len(connection) == 2:
src_idx, dst_idx = connection
if 0 <= src_idx < len(nodes) and 0 <= dst_idx < len(nodes):
edges.append(Edge(nodes[src_idx], nodes[dst_idx]))
return Skeleton(nodes=nodes, edges=edges, name="ultralytics_skeleton")
create_splits_from_labels(labels, split_ratios)
¶
Create dataset splits from Labels using the built-in splitting functionality.
Source code in sleap_io/io/ultralytics.py
def create_splits_from_labels(
labels: Labels, split_ratios: Dict[str, float]
) -> Dict[str, Labels]:
"""Create dataset splits from Labels using the built-in splitting functionality."""
split_names = list(split_ratios.keys())
if len(split_names) == 2:
# Two-way split
ratio = split_ratios[split_names[0]]
split1, split2 = labels.split(ratio)
return {split_names[0]: split1, split_names[1]: split2}
elif len(split_names) == 3:
# Three-way split using make_training_splits
train_ratio = split_ratios.get("train", 0.6)
val_ratio = split_ratios.get("val", 0.2)
test_ratio = split_ratios.get("test", 0.2)
try:
train_split, val_split, test_split = labels.make_training_splits(
n_train=train_ratio, n_val=val_ratio, n_test=test_ratio
)
return {"train": train_split, "val": val_split, "test": test_split}
except:
# Fallback to manual splitting
first_split = train_ratio + val_ratio
temp_split, test_split = labels.split(first_split)
train_split, val_split = temp_split.split(train_ratio / first_split)
return {"train": train_split, "val": val_split, "test": test_split}
else:
# Single split or custom splits
return {split_names[0]: labels}
denormalize_coordinates(normalized_points, image_shape)
¶
Denormalize coordinates from [0,1] range to pixel coordinates.
Returns:
Type | Description |
---|---|
ndarray
|
A numpy array of shape (n_points, 3) with columns [x, y, visible]. |
Source code in sleap_io/io/ultralytics.py
def denormalize_coordinates(
normalized_points: List[Tuple[float, float, int]], image_shape: Tuple[int, int]
) -> np.ndarray:
"""Denormalize coordinates from [0,1] range to pixel coordinates.
Returns:
A numpy array of shape (n_points, 3) with columns [x, y, visible].
"""
height, width = image_shape
points = []
for x_norm, y_norm, visibility in normalized_points:
if visibility > 0:
x_px = x_norm * width
y_px = y_norm * height
is_visible = True
else:
x_px = np.nan
y_px = np.nan
is_visible = False
points.append([x_px, y_px, is_visible])
return np.array(points, dtype=np.float32)
normalize_coordinates(instance, image_shape)
¶
Normalize instance point coordinates to [0,1] range.
Source code in sleap_io/io/ultralytics.py
def normalize_coordinates(
instance: Instance, image_shape: Tuple[int, int]
) -> List[Tuple[float, float, int]]:
"""Normalize instance point coordinates to [0,1] range."""
height, width = image_shape
normalized = []
for point in instance.points:
x, y = point["xy"]
if point["visible"] and not np.isnan(x):
x_norm = x / width
y_norm = y / height
visibility = 2 # visible
else:
x_norm = 0.0
y_norm = 0.0
visibility = 0 # not visible
normalized.append((x_norm, y_norm, visibility))
return normalized
parse_data_yaml(yaml_path)
¶
parse_label_file(label_path, skeleton, image_shape)
¶
Parse a single Ultralytics label file and return instances.
Source code in sleap_io/io/ultralytics.py
def parse_label_file(
label_path: Path, skeleton: Skeleton, image_shape: Tuple[int, int]
) -> List[Instance]:
"""Parse a single Ultralytics label file and return instances."""
instances = []
with open(label_path, "r") as f:
for line_num, line in enumerate(f):
line = line.strip()
if not line or line.startswith("#"):
continue
try:
parts = line.split()
if len(parts) < 5:
warnings.warn(
f"Invalid line {line_num} in {label_path}: insufficient data"
)
continue
class_id = int(parts[0])
x_center, y_center, width, height = map(float, parts[1:5])
# Parse keypoints
keypoint_data = parts[5:]
if len(keypoint_data) % 3 != 0:
warnings.warn(
f"Invalid keypoint data in {label_path} line {line_num}"
)
continue
num_keypoints = len(keypoint_data) // 3
if num_keypoints != len(skeleton.nodes):
warnings.warn(
f"Keypoint count mismatch: expected {len(skeleton.nodes)}, "
f"got {num_keypoints} in {label_path} line {line_num}"
)
continue
# Convert normalized coordinates to pixel coordinates
height_px, width_px = image_shape
points = []
for i in range(num_keypoints):
x_norm = float(keypoint_data[i * 3])
y_norm = float(keypoint_data[i * 3 + 1])
visibility = int(keypoint_data[i * 3 + 2])
# Denormalize coordinates
x_px = x_norm * width_px
y_px = y_norm * height_px
# Convert visibility: 0=not visible, 1=occluded, 2=visible
is_visible = visibility > 0
if visibility == 0:
# Not visible - use NaN coordinates
points.append([np.nan, np.nan, False])
else:
points.append([x_px, y_px, is_visible])
# Create instance from numpy array
points_array = np.array(points, dtype=np.float32)
instance = Instance.from_numpy(
points_data=points_array, skeleton=skeleton
)
instances.append(instance)
except (ValueError, IndexError) as e:
warnings.warn(f"Error parsing line {line_num} in {label_path}: {e}")
continue
return instances
read_labels(dataset_path, split='train', skeleton=None, image_size=(480, 640))
¶
Read Ultralytics YOLO pose dataset and return a Labels
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_path
|
str
|
Path to the Ultralytics dataset root directory containing data.yaml. |
required |
split
|
str
|
Dataset split to read ('train', 'val', or 'test'). Defaults to 'train'. |
'train'
|
skeleton
|
Optional[Skeleton]
|
Optional skeleton to use. If not provided, will be inferred from data.yaml. |
None
|
image_size
|
Tuple[int, int]
|
Image dimensions (height, width) for coordinate denormalization. Defaults to (480, 640). Will attempt to infer from actual images if available. |
(480, 640)
|
Returns:
Type | Description |
---|---|
Labels
|
Parsed labels as a |
Source code in sleap_io/io/ultralytics.py
def read_labels(
dataset_path: str,
split: str = "train",
skeleton: Optional[Skeleton] = None,
image_size: Tuple[int, int] = (480, 640),
) -> Labels:
"""Read Ultralytics YOLO pose dataset and return a `Labels` object.
Args:
dataset_path: Path to the Ultralytics dataset root directory containing data.yaml.
split: Dataset split to read ('train', 'val', or 'test'). Defaults to 'train'.
skeleton: Optional skeleton to use. If not provided, will be inferred from data.yaml.
image_size: Image dimensions (height, width) for coordinate denormalization.
Defaults to (480, 640). Will attempt to infer from actual images if available.
Returns:
Parsed labels as a `Labels` instance.
"""
dataset_path = Path(dataset_path)
# Parse data.yaml configuration
if dataset_path.name == "data.yaml":
# If path already points to data.yaml, use its parent as dataset path
data_yaml_path = dataset_path
dataset_path = dataset_path.parent
else:
data_yaml_path = dataset_path / "data.yaml"
if not data_yaml_path.exists():
raise FileNotFoundError(f"data.yaml not found at {data_yaml_path}")
config = parse_data_yaml(data_yaml_path)
# Use provided skeleton or create from config
if skeleton is None:
skeleton = create_skeleton_from_config(config)
# Get paths for the specified split
split_path = config.get(split, f"{split}/images")
images_dir = dataset_path / split_path
labels_dir = dataset_path / split_path.replace("/images", "/labels")
if not images_dir.exists():
raise FileNotFoundError(f"Images directory not found: {images_dir}")
if not labels_dir.exists():
raise FileNotFoundError(f"Labels directory not found: {labels_dir}")
# Process all image/label pairs
labeled_frames = []
tracks = {} # Track synthetic tracks by instance order
for image_file in sorted(images_dir.glob("*")):
if image_file.suffix.lower() in [".jpg", ".jpeg", ".png", ".tiff", ".bmp"]:
label_file = labels_dir / f"{image_file.stem}.txt"
# Create video object for this image
video = Video.from_filename(str(image_file))
# Parse label file if it exists
instances = []
if label_file.exists():
# Get image dimensions - try from video shape first, fallback to reading image
if video.shape is not None:
img_shape = video.shape[:2]
else:
# Read first frame to get dimensions
img = video[0]
img_shape = img.shape[:2]
instances = parse_label_file(label_file, skeleton, img_shape)
# Assign tracks to instances based on order
for i, instance in enumerate(instances):
track_name = f"track_{i}"
if track_name not in tracks:
tracks[track_name] = Track(name=track_name)
instance.track = tracks[track_name]
# Create labeled frame
frame = LabeledFrame(video=video, frame_idx=0, instances=instances)
labeled_frames.append(frame)
return Labels(
labeled_frames=labeled_frames,
skeletons=[skeleton],
tracks=list(tracks.values()),
provenance={"source": str(dataset_path), "split": split},
)
write_label_file(label_path, frame, skeleton, image_shape, class_id=0)
¶
Write a single Ultralytics label file for a frame.
Source code in sleap_io/io/ultralytics.py
def write_label_file(
label_path: Path,
frame: LabeledFrame,
skeleton: Skeleton,
image_shape: Tuple[int, int],
class_id: int = 0,
) -> None:
"""Write a single Ultralytics label file for a frame."""
height_px, width_px = image_shape
with open(label_path, "w") as f:
for instance in frame.instances:
if len(instance.points) != len(skeleton.nodes):
warnings.warn(
f"Instance has {len(instance.points)} points, "
f"skeleton has {len(skeleton.nodes)} nodes. Skipping."
)
continue
# Calculate bounding box from visible keypoints
visible_mask = instance.points["visible"] & ~np.isnan(
instance.points["xy"][:, 0]
)
if not visible_mask.any():
continue # Skip instances with no visible points
visible_xy = instance.points["xy"][visible_mask]
x_coords = visible_xy[:, 0]
y_coords = visible_xy[:, 1]
x_min, x_max = min(x_coords), max(x_coords)
y_min, y_max = min(y_coords), max(y_coords)
# Add padding to bounding box
padding = 10 # pixels
x_min = max(0, x_min - padding)
y_min = max(0, y_min - padding)
x_max = min(width_px, x_max + padding)
y_max = min(height_px, y_max + padding)
# Convert to normalized YOLO format
x_center_norm = ((x_min + x_max) / 2) / width_px
y_center_norm = ((y_min + y_max) / 2) / height_px
width_norm = (x_max - x_min) / width_px
height_norm = (y_max - y_min) / height_px
# Build line: class_id x_center y_center width height x1 y1 v1 x2 y2 v2 ...
line_parts = [
str(class_id),
f"{x_center_norm:.6f}",
f"{y_center_norm:.6f}",
f"{width_norm:.6f}",
f"{height_norm:.6f}",
]
# Add keypoints
for point in instance.points:
x, y = point["xy"]
if point["visible"] and not np.isnan(x):
x_norm = x / width_px
y_norm = y / height_px
visibility = 2 # visible and not occluded
else:
x_norm = 0.0
y_norm = 0.0
visibility = 0 # not visible
line_parts.extend([f"{x_norm:.6f}", f"{y_norm:.6f}", str(visibility)])
f.write(" ".join(line_parts) + "\n")
write_labels(labels, dataset_path, split_ratios={'train': 0.8, 'val': 0.2}, class_id=0, image_format='png', image_quality=None, verbose=True, use_multiprocessing=False, n_workers=None, **kwargs)
¶
Write Labels to Ultralytics YOLO pose format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels
|
Labels
|
SLEAP Labels object to export. |
required |
dataset_path
|
str
|
Path to write the Ultralytics dataset. |
required |
split_ratios
|
Dict[str, float]
|
Dictionary mapping split names to ratios (must sum to 1.0). |
{'train': 0.8, 'val': 0.2}
|
class_id
|
int
|
Class ID to use for all instances (default: 0). |
0
|
image_format
|
str
|
Image format to use for saving frames. Either "png" (default, lossless) or "jpg". |
'png'
|
image_quality
|
Optional[int]
|
Image quality for JPEG format (1-100). For PNG, this is the compression level (0-9). If None, uses default quality settings. |
None
|
verbose
|
bool
|
If True (default), show progress bars during export. |
True
|
use_multiprocessing
|
bool
|
If True, use multiprocessing for parallel image saving. Default is False. |
False
|
n_workers
|
Optional[int]
|
Number of worker processes. If None, uses CPU count - 1. Only used if use_multiprocessing=True. |
None
|
**kwargs
|
Additional arguments (unused, for compatibility). |
{}
|
Source code in sleap_io/io/ultralytics.py
def write_labels(
labels: Labels,
dataset_path: str,
split_ratios: Dict[str, float] = {"train": 0.8, "val": 0.2},
class_id: int = 0,
image_format: str = "png",
image_quality: Optional[int] = None,
verbose: bool = True,
use_multiprocessing: bool = False,
n_workers: Optional[int] = None,
**kwargs,
) -> None:
"""Write Labels to Ultralytics YOLO pose format.
Args:
labels: SLEAP Labels object to export.
dataset_path: Path to write the Ultralytics dataset.
split_ratios: Dictionary mapping split names to ratios (must sum to 1.0).
class_id: Class ID to use for all instances (default: 0).
image_format: Image format to use for saving frames. Either "png" (default, lossless) or "jpg".
image_quality: Image quality for JPEG format (1-100). For PNG, this is the compression level (0-9).
If None, uses default quality settings.
verbose: If True (default), show progress bars during export.
use_multiprocessing: If True, use multiprocessing for parallel image saving. Default is False.
n_workers: Number of worker processes. If None, uses CPU count - 1. Only used if use_multiprocessing=True.
**kwargs: Additional arguments (unused, for compatibility).
"""
dataset_path = Path(dataset_path)
dataset_path.mkdir(parents=True, exist_ok=True)
# Validate split ratios
total_ratio = sum(split_ratios.values())
if not np.isclose(total_ratio, 1.0):
raise ValueError(f"Split ratios must sum to 1.0, got {total_ratio}")
if len(labels.skeletons) == 0:
raise ValueError("Labels must have at least one skeleton")
skeleton = labels.skeletons[0]
# Create data.yaml configuration
create_data_yaml(dataset_path / "data.yaml", skeleton, split_ratios)
# Split the labels if multiple splits requested
if len(split_ratios) == 1:
split_name = list(split_ratios.keys())[0]
split_labels = {split_name: labels}
else:
split_labels = create_splits_from_labels(labels, split_ratios)
# Write each split
for split_name, split_data in split_labels.items():
images_dir = dataset_path / split_name / "images"
labels_dir = dataset_path / split_name / "labels"
images_dir.mkdir(parents=True, exist_ok=True)
labels_dir.mkdir(parents=True, exist_ok=True)
if use_multiprocessing:
# Prepare frame data for multiprocessing
frame_data_list = []
for lf_idx, frame in enumerate(split_data.labeled_frames):
image_filename = f"{lf_idx:07d}.{image_format}"
image_path = images_dir / image_filename
frame_data = {
"video_path": str(frame.video.filename),
"frame_idx": frame.frame_idx,
"lf_idx": lf_idx,
"output_path": str(image_path),
"frame": frame, # Keep reference for annotation writing
}
frame_data_list.append(frame_data)
# Set up worker pool
if n_workers is None:
n_workers = max(1, multiprocessing.cpu_count() - 1)
# Process frames in parallel
with Pool(processes=n_workers) as pool:
# Create args for each frame
args_list = [
(fd, image_format, image_quality) for fd in frame_data_list
]
# Use imap for progress tracking
if verbose:
results = list(
tqdm(
pool.imap(_save_frame_image, args_list),
total=len(args_list),
desc=f"Writing {split_name} images (parallel)",
)
)
else:
results = pool.map(_save_frame_image, args_list)
# Write annotations for successfully saved images
for frame_data, result in zip(frame_data_list, results):
if result is not None:
frame = frame_data["frame"]
# Get image shape from saved file
img = iio.imread(result)
label_filename = f"{frame_data['lf_idx']:07d}.txt"
label_path = labels_dir / label_filename
write_label_file(
label_path, frame, skeleton, img.shape[:2], class_id
)
else:
# Sequential processing (original implementation)
frame_iterator = (
tqdm(
split_data.labeled_frames,
desc=f"Writing {split_name} images",
disable=not verbose,
)
if verbose
else split_data.labeled_frames
)
for lf_idx, frame in enumerate(frame_iterator):
# Extract frame image
try:
frame_img = frame.image
if frame_img is None:
warnings.warn(
f"Could not load frame {frame.frame_idx} from video, skipping."
)
continue
# Use labeled frame index for filename
image_filename = f"{lf_idx:07d}.{image_format}"
image_path = images_dir / image_filename
# Handle grayscale conversion if needed
if frame_img.ndim == 3 and frame_img.shape[-1] == 1:
# Squeeze single channel dimension for grayscale
frame_img = np.squeeze(frame_img, axis=-1)
# Save image with appropriate quality settings
save_kwargs = {}
if image_format.lower() in ["jpg", "jpeg"]:
if image_quality is not None:
save_kwargs["quality"] = image_quality
elif image_format.lower() == "png":
if image_quality is not None:
# PNG uses compress_level (0-9)
save_kwargs["compress_level"] = min(
9, max(0, image_quality)
)
# Save the image
iio.imwrite(image_path, frame_img, **save_kwargs)
# Save annotations with same base filename
label_filename = f"{lf_idx:07d}.txt"
label_path = labels_dir / label_filename
write_label_file(
label_path, frame, skeleton, frame_img.shape[:2], class_id
)
except Exception as e:
warnings.warn(
f"Error processing frame {frame.frame_idx}: {str(e)}, skipping."
)
continue