Data model¶
sleap-io
implements the core data structures used in SLEAP for storing data related to multi-instance pose tracking, including for annotation, training and inference.
sleap_io.Labels
¶
Pose data for a set of videos that have user labels and/or predictions.
Attributes:
Name | Type | Description |
---|---|---|
labeled_frames |
list[LabeledFrame]
|
A list of |
videos |
list[Video]
|
A list of |
skeletons |
list[Skeleton]
|
A list of |
tracks |
list[Track]
|
A list of |
suggestions |
list[SuggestionFrame]
|
A list of |
provenance |
dict[str, Any]
|
Dictionary of arbitrary metadata providing additional information about where the dataset came from. |
Notes
Video
s in contain LabeledFrame
s, and Skeleton
s and Track
s in contained
Instance
s are added to the respective lists automatically.
Source code in sleap_io/model/labels.py
@define
class Labels:
"""Pose data for a set of videos that have user labels and/or predictions.
Attributes:
labeled_frames: A list of `LabeledFrame`s that are associated with this dataset.
videos: A list of `Video`s that are associated with this dataset. Videos do not
need to have corresponding `LabeledFrame`s if they do not have any
labels or predictions yet.
skeletons: A list of `Skeleton`s that are associated with this dataset. This
should generally only contain a single skeleton.
tracks: A list of `Track`s that are associated with this dataset.
suggestions: A list of `SuggestionFrame`s that are associated with this dataset.
provenance: Dictionary of arbitrary metadata providing additional information
about where the dataset came from.
Notes:
`Video`s in contain `LabeledFrame`s, and `Skeleton`s and `Track`s in contained
`Instance`s are added to the respective lists automatically.
"""
labeled_frames: list[LabeledFrame] = field(factory=list)
videos: list[Video] = field(factory=list)
skeletons: list[Skeleton] = field(factory=list)
tracks: list[Track] = field(factory=list)
suggestions: list[SuggestionFrame] = field(factory=list)
provenance: dict[str, Any] = field(factory=dict)
def __attrs_post_init__(self):
"""Append videos, skeletons, and tracks seen in `labeled_frames` to `Labels`."""
self.update()
def update(self):
"""Update data structures based on contents.
This function will update the list of skeletons, videos and tracks from the
labeled frames, instances and suggestions.
"""
for lf in self.labeled_frames:
if lf.video not in self.videos:
self.videos.append(lf.video)
for inst in lf:
if inst.skeleton not in self.skeletons:
self.skeletons.append(inst.skeleton)
if inst.track is not None and inst.track not in self.tracks:
self.tracks.append(inst.track)
for sf in self.suggestions:
if sf.video not in self.videos:
self.videos.append(sf.video)
def __getitem__(
self, key: int | slice | list[int] | np.ndarray | tuple[Video, int]
) -> list[LabeledFrame] | LabeledFrame:
"""Return one or more labeled frames based on indexing criteria."""
if type(key) == int:
return self.labeled_frames[key]
elif type(key) == slice:
return [self.labeled_frames[i] for i in range(*key.indices(len(self)))]
elif type(key) == list:
return [self.labeled_frames[i] for i in key]
elif isinstance(key, np.ndarray):
return [self.labeled_frames[i] for i in key.tolist()]
elif type(key) == tuple and len(key) == 2:
video, frame_idx = key
res = self.find(video, frame_idx)
if len(res) == 1:
return res[0]
elif len(res) == 0:
raise IndexError(
f"No labeled frames found for video {video} and "
f"frame index {frame_idx}."
)
elif type(key) == Video:
res = self.find(key)
if len(res) == 0:
raise IndexError(f"No labeled frames found for video {key}.")
return res
else:
raise IndexError(f"Invalid indexing argument for labels: {key}")
def __iter__(self):
"""Iterate over `labeled_frames` list when calling iter method on `Labels`."""
return iter(self.labeled_frames)
def __len__(self) -> int:
"""Return number of labeled frames."""
return len(self.labeled_frames)
def __repr__(self) -> str:
"""Return a readable representation of the labels."""
return (
"Labels("
f"labeled_frames={len(self.labeled_frames)}, "
f"videos={len(self.videos)}, "
f"skeletons={len(self.skeletons)}, "
f"tracks={len(self.tracks)}, "
f"suggestions={len(self.suggestions)}"
")"
)
def __str__(self) -> str:
"""Return a readable representation of the labels."""
return self.__repr__()
def append(self, lf: LabeledFrame, update: bool = True):
"""Append a labeled frame to the labels.
Args:
lf: A labeled frame to add to the labels.
update: If `True` (the default), update list of videos, tracks and
skeletons from the contents.
"""
self.labeled_frames.append(lf)
if update:
if lf.video not in self.videos:
self.videos.append(lf.video)
for inst in lf:
if inst.skeleton not in self.skeletons:
self.skeletons.append(inst.skeleton)
if inst.track is not None and inst.track not in self.tracks:
self.tracks.append(inst.track)
def extend(self, lfs: list[LabeledFrame], update: bool = True):
"""Append a labeled frame to the labels.
Args:
lfs: A list of labeled frames to add to the labels.
update: If `True` (the default), update list of videos, tracks and
skeletons from the contents.
"""
self.labeled_frames.extend(lfs)
if update:
for lf in lfs:
if lf.video not in self.videos:
self.videos.append(lf.video)
for inst in lf:
if inst.skeleton not in self.skeletons:
self.skeletons.append(inst.skeleton)
if inst.track is not None and inst.track not in self.tracks:
self.tracks.append(inst.track)
def numpy(
self,
video: Optional[Union[Video, int]] = None,
all_frames: bool = True,
untracked: bool = False,
return_confidence: bool = False,
) -> np.ndarray:
"""Construct a numpy array from instance points.
Args:
video: Video or video index to convert to numpy arrays. If `None` (the
default), uses the first video.
untracked: If `False` (the default), include only instances that have a
track assignment. If `True`, includes all instances in each frame in
arbitrary order.
return_confidence: If `False` (the default), only return points of nodes. If
`True`, return the points and scores of nodes.
Returns:
An array of tracks of shape `(n_frames, n_tracks, n_nodes, 2)` if
`return_confidence` is `False`. Otherwise returned shape is
`(n_frames, n_tracks, n_nodes, 3)` if `return_confidence` is `True`.
Missing data will be replaced with `np.nan`.
If this is a single instance project, a track does not need to be assigned.
Only predicted instances (NOT user instances) will be returned.
Notes:
This method assumes that instances have tracks assigned and is intended to
function primarily for single-video prediction results.
"""
# Get labeled frames for specified video.
if video is None:
video = 0
if type(video) == int:
video = self.videos[video]
lfs = [lf for lf in self.labeled_frames if lf.video == video]
# Figure out frame index range.
first_frame, last_frame = 0, 0
for lf in lfs:
first_frame = min(first_frame, lf.frame_idx)
last_frame = max(last_frame, lf.frame_idx)
# Figure out the number of tracks based on number of instances in each frame.
# First, let's check the max number of predicted instances (regardless of
# whether they're tracked.
n_preds = 0
for lf in lfs:
n_pred_instances = len(lf.predicted_instances)
n_preds = max(n_preds, n_pred_instances)
# Case 1: We don't care about order because there's only 1 instance per frame,
# or we're considering untracked instances.
untracked = untracked or n_preds == 1
if untracked:
n_tracks = n_preds
else:
# Case 2: We're considering only tracked instances.
n_tracks = len(self.tracks)
n_frames = int(last_frame - first_frame + 1)
skeleton = self.skeletons[-1] # Assume project only uses last skeleton
n_nodes = len(skeleton.nodes)
if return_confidence:
tracks = np.full((n_frames, n_tracks, n_nodes, 3), np.nan, dtype="float32")
else:
tracks = np.full((n_frames, n_tracks, n_nodes, 2), np.nan, dtype="float32")
for lf in lfs:
i = int(lf.frame_idx - first_frame)
if untracked:
for j, inst in enumerate(lf.predicted_instances):
tracks[i, j] = inst.numpy(scores=return_confidence)
else:
tracked_instances = [
inst
for inst in lf.instances
if type(inst) == PredictedInstance and inst.track is not None
]
for inst in tracked_instances:
j = self.tracks.index(inst.track) # type: ignore[arg-type]
tracks[i, j] = inst.numpy(scores=return_confidence)
return tracks
@property
def video(self) -> Video:
"""Return the video if there is only a single video in the labels."""
if len(self.videos) == 0:
raise ValueError("There are no videos in the labels.")
elif len(self.videos) == 1:
return self.videos[0]
else:
raise ValueError(
"Labels.video can only be used when there is only a single video saved "
"in the labels. Use Labels.videos instead."
)
@property
def skeleton(self) -> Skeleton:
"""Return the skeleton if there is only a single skeleton in the labels."""
if len(self.skeletons) == 0:
raise ValueError("There are no skeletons in the labels.")
elif len(self.skeletons) == 1:
return self.skeletons[0]
else:
raise ValueError(
"Labels.skeleton can only be used when there is only a single skeleton "
"saved in the labels. Use Labels.skeletons instead."
)
def find(
self,
video: Video,
frame_idx: int | list[int] | None = None,
return_new: bool = False,
) -> list[LabeledFrame]:
"""Search for labeled frames given video and/or frame index.
Args:
video: A `Video` that is associated with the project.
frame_idx: The frame index (or indices) which we want to find in the video.
If a range is specified, we'll return all frames with indices in that
range. If not specific, then we'll return all labeled frames for video.
return_new: Whether to return singleton of new and empty `LabeledFrame` if
none are found in project.
Returns:
List of `LabeledFrame` objects that match the criteria.
The list will be empty if no matches found, unless return_new is True, in
which case it contains new (empty) `LabeledFrame` objects with `video` and
`frame_index` set.
"""
results = []
if frame_idx is None:
for lf in self.labeled_frames:
if lf.video == video:
results.append(lf)
return results
if np.isscalar(frame_idx):
frame_idx = np.array(frame_idx).reshape(-1)
for frame_ind in frame_idx:
result = None
for lf in self.labeled_frames:
if lf.video == video and lf.frame_idx == frame_ind:
result = lf
results.append(result)
break
if result is None and return_new:
results.append(LabeledFrame(video=video, frame_idx=frame_ind))
return results
def save(
self,
filename: str,
format: Optional[str] = None,
embed: bool | str | list[tuple[Video, int]] | None = None,
**kwargs,
):
"""Save labels to file in specified format.
Args:
filename: Path to save labels to.
format: The format to save the labels in. If `None`, the format will be
inferred from the file extension. Available formats are `"slp"`,
`"nwb"`, `"labelstudio"`, and `"jabs"`.
embed: Frames to embed in the saved labels file. One of `None`, `True`,
`"all"`, `"user"`, `"suggestions"`, `"user+suggestions"`, `"source"` or
list of tuples of `(video, frame_idx)`.
If `None` is specified (the default) and the labels contains embedded
frames, those embedded frames will be re-saved to the new file.
If `True` or `"all"`, all labeled frames and suggested frames will be
embedded.
If `"source"` is specified, no images will be embedded and the source
video will be restored if available.
This argument is only valid for the SLP backend.
"""
from sleap_io import save_file
save_file(self, filename, format=format, embed=embed, **kwargs)
def clean(
self,
frames: bool = True,
empty_instances: bool = False,
skeletons: bool = True,
tracks: bool = True,
videos: bool = False,
):
"""Remove empty frames, unused skeletons, tracks and videos.
Args:
frames: If `True` (the default), remove empty frames.
empty_instances: If `True` (NOT default), remove instances that have no
visible points.
skeletons: If `True` (the default), remove unused skeletons.
tracks: If `True` (the default), remove unused tracks.
videos: If `True` (NOT default), remove videos that have no labeled frames.
"""
used_skeletons = []
used_tracks = []
used_videos = []
kept_frames = []
for lf in self.labeled_frames:
if empty_instances:
lf.remove_empty_instances()
if frames and len(lf) == 0:
continue
if videos and lf.video not in used_videos:
used_videos.append(lf.video)
if skeletons or tracks:
for inst in lf:
if skeletons and inst.skeleton not in used_skeletons:
used_skeletons.append(inst.skeleton)
if (
tracks
and inst.track is not None
and inst.track not in used_tracks
):
used_tracks.append(inst.track)
if frames:
kept_frames.append(lf)
if videos:
self.videos = [video for video in self.videos if video in used_videos]
if skeletons:
self.skeletons = [
skeleton for skeleton in self.skeletons if skeleton in used_skeletons
]
if tracks:
self.tracks = [track for track in self.tracks if track in used_tracks]
if frames:
self.labeled_frames = kept_frames
def remove_predictions(self, clean: bool = True):
"""Remove all predicted instances from the labels.
Args:
clean: If `True` (the default), also remove any empty frames and unused
tracks and skeletons. It does NOT remove videos that have no labeled
frames or instances with no visible points.
See also: `Labels.clean`
"""
for lf in self.labeled_frames:
lf.remove_predictions()
if clean:
self.clean(
frames=True,
empty_instances=False,
skeletons=True,
tracks=True,
videos=False,
)
@property
def user_labeled_frames(self) -> list[LabeledFrame]:
"""Return all labeled frames with user (non-predicted) instances."""
return [lf for lf in self.labeled_frames if lf.has_user_instances]
def replace_videos(
self,
old_videos: list[Video] | None = None,
new_videos: list[Video] | None = None,
video_map: dict[Video, Video] | None = None,
):
"""Replace videos and update all references.
Args:
old_videos: List of videos to be replaced.
new_videos: List of videos to replace with.
video_map: Alternative input of dictionary where keys are the old videos and
values are the new videos.
"""
if video_map is None:
video_map = {o: n for o, n in zip(old_videos, new_videos)}
# Update the labeled frames with the new videos.
for lf in self.labeled_frames:
if lf.video in video_map:
lf.video = video_map[lf.video]
# Update suggestions with the new videos.
for sf in self.suggestions:
if sf.video in video_map:
sf.video = video_map[sf.video]
def replace_filenames(
self,
new_filenames: list[str | Path] | None = None,
filename_map: dict[str | Path, str | Path] | None = None,
prefix_map: dict[str | Path, str | Path] | None = None,
):
"""Replace video filenames.
Args:
new_filenames: List of new filenames. Must have the same length as the
number of videos in the labels.
filename_map: Dictionary mapping old filenames (keys) to new filenames
(values).
prefix_map: Dictonary mapping old prefixes (keys) to new prefixes (values).
Notes:
Only one of the argument types can be provided.
"""
n = 0
if new_filenames is not None:
n += 1
if filename_map is not None:
n += 1
if prefix_map is not None:
n += 1
if n != 1:
raise ValueError(
"Exactly one input method must be provided to replace filenames."
)
if new_filenames is not None:
if len(self.videos) != len(new_filenames):
raise ValueError(
f"Number of new filenames ({len(new_filenames)}) does not match "
f"the number of videos ({len(self.videos)})."
)
for video, new_filename in zip(self.videos, new_filenames):
video.replace_filename(new_filename)
elif filename_map is not None:
for video in self.videos:
for old_fn, new_fn in filename_map.items():
if type(video.filename) == list:
new_fns = []
for fn in video.filename:
if Path(fn) == Path(old_fn):
new_fns.append(new_fn)
else:
new_fns.append(fn)
video.replace_filename(new_fns)
else:
if Path(video.filename) == Path(old_fn):
video.replace_filename(new_fn)
elif prefix_map is not None:
for video in self.videos:
for old_prefix, new_prefix in prefix_map.items():
old_prefix, new_prefix = Path(old_prefix), Path(new_prefix)
if type(video.filename) == list:
new_fns = []
for fn in video.filename:
fn = Path(fn)
if fn.as_posix().startswith(old_prefix.as_posix()):
new_fns.append(new_prefix / fn.relative_to(old_prefix))
else:
new_fns.append(fn)
video.replace_filename(new_fns)
else:
fn = Path(video.filename)
if fn.as_posix().startswith(old_prefix.as_posix()):
video.replace_filename(
new_prefix / fn.relative_to(old_prefix)
)
def split(self, n: int | float, seed: int | None = None) -> tuple[Labels, Labels]:
"""Separate the labels into random splits.
Args:
n: Size of the first split. If integer >= 1, assumes that this is the number
of labeled frames in the first split. If < 1.0, this will be treated as
a fraction of the total labeled frames.
seed: Optional integer seed to use for reproducibility.
Returns:
A tuple of `split1, split2`.
If an integer was specified, `len(split1) == n`.
If a fraction was specified, `len(split1) == int(n * len(labels))`.
The second split contains the remainder, i.e.,
`len(split2) == len(labels) - len(split1)`.
If there are too few frames, a minimum of 1 frame will be kept in the second
split.
If there is exactly 1 labeled frame in the labels, the same frame will be
assigned to both splits.
"""
n0 = len(self)
if n0 == 0:
return self, self
n1 = n
if n < 1.0:
n1 = max(int(n0 * float(n)), 1)
n2 = max(n0 - n1, 1)
n1, n2 = int(n1), int(n2)
rng = np.random.default_rng(seed=seed)
inds1 = rng.choice(n0, size=(n1,), replace=False)
if n0 == 1:
inds2 = np.array([0])
else:
inds2 = np.setdiff1d(np.arange(n0), inds1)
split1, split2 = self[inds1], self[inds2]
split1, split2 = deepcopy(split1), deepcopy(split2)
split1, split2 = Labels(split1), Labels(split2)
split1.provenance = self.provenance
split2.provenance = self.provenance
split1.provenance["source_labels"] = self.provenance.get("filename", None)
split2.provenance["source_labels"] = self.provenance.get("filename", None)
return split1, split2
def make_training_splits(
self,
n_train: int | float,
n_val: int | float | None = None,
n_test: int | float | None = None,
save_dir: str | Path | None = None,
seed: int | None = None,
) -> tuple[Labels, Labels] | tuple[Labels, Labels, Labels]:
"""Make splits for training with embedded images.
Args:
n_train: Size of the training split as integer or fraction.
n_val: Size of the validation split as integer or fraction. If `None`,
this will be inferred based on the values of `n_train` and `n_test`. If
`n_test` is `None`, this will be the remainder of the data after the
training split.
n_test: Size of the testing split as integer or fraction. If `None`, the
test split will not be saved.
save_dir: If specified, save splits to SLP files with embedded images.
seed: Optional integer seed to use for reproducibility.
Returns:
A tuple of `labels_train, labels_val` or
`labels_train, labels_val, labels_test` if `n_test` was specified.
Notes:
Predictions and suggestions will be removed before saving, leaving only
frames with user labeled data (the source labels are not affected).
Frames with user labeled data will be embedded in the resulting files.
If `save_dir` is specified, this will save the randomly sampled splits to:
- `{save_dir}/train.pkg.slp`
- `{save_dir}/val.pkg.slp`
- `{save_dir}/test.pkg.slp` (if `n_test` is specified)
See also: `Labels.split`
"""
# Clean up labels.
labels = deepcopy(self)
labels.remove_predictions()
labels.suggestions = []
labels.clean()
# Make splits.
labels_train, labels_rest = labels.split(n_train, seed=seed)
if n_test is not None:
if n_test < 1:
n_test = (n_test * len(labels)) / len(labels_rest)
labels_test, labels_rest = labels_rest.split(n=n_test, seed=seed)
if n_val is not None:
if n_val < 1:
n_val = (n_val * len(labels)) / len(labels_rest)
labels_val, _ = labels_rest.split(n=n_val, seed=seed)
else:
labels_val = labels_rest
# Save.
if save_dir is not None:
save_dir = Path(save_dir)
save_dir.mkdir(exist_ok=True, parents=True)
labels_train.save(save_dir / "train.pkg.slp", embed="user")
labels_val.save(save_dir / "val.pkg.slp", embed="user")
labels_test.save(save_dir / "test.pkg.slp", embed="user")
if n_test is None:
return labels_train, labels_val
else:
return labels_train, labels_val, labels_test
skeleton: Skeleton
property
¶
Return the skeleton if there is only a single skeleton in the labels.
user_labeled_frames: list[LabeledFrame]
property
¶
Return all labeled frames with user (non-predicted) instances.
video: Video
property
¶
Return the video if there is only a single video in the labels.
__attrs_post_init__()
¶
__getitem__(key)
¶
Return one or more labeled frames based on indexing criteria.
Source code in sleap_io/model/labels.py
def __getitem__(
self, key: int | slice | list[int] | np.ndarray | tuple[Video, int]
) -> list[LabeledFrame] | LabeledFrame:
"""Return one or more labeled frames based on indexing criteria."""
if type(key) == int:
return self.labeled_frames[key]
elif type(key) == slice:
return [self.labeled_frames[i] for i in range(*key.indices(len(self)))]
elif type(key) == list:
return [self.labeled_frames[i] for i in key]
elif isinstance(key, np.ndarray):
return [self.labeled_frames[i] for i in key.tolist()]
elif type(key) == tuple and len(key) == 2:
video, frame_idx = key
res = self.find(video, frame_idx)
if len(res) == 1:
return res[0]
elif len(res) == 0:
raise IndexError(
f"No labeled frames found for video {video} and "
f"frame index {frame_idx}."
)
elif type(key) == Video:
res = self.find(key)
if len(res) == 0:
raise IndexError(f"No labeled frames found for video {key}.")
return res
else:
raise IndexError(f"Invalid indexing argument for labels: {key}")
__iter__()
¶
__len__()
¶
__repr__()
¶
Return a readable representation of the labels.
Source code in sleap_io/model/labels.py
__str__()
¶
append(lf, update=True)
¶
Append a labeled frame to the labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lf |
LabeledFrame
|
A labeled frame to add to the labels. |
required |
update |
bool
|
If |
True
|
Source code in sleap_io/model/labels.py
def append(self, lf: LabeledFrame, update: bool = True):
"""Append a labeled frame to the labels.
Args:
lf: A labeled frame to add to the labels.
update: If `True` (the default), update list of videos, tracks and
skeletons from the contents.
"""
self.labeled_frames.append(lf)
if update:
if lf.video not in self.videos:
self.videos.append(lf.video)
for inst in lf:
if inst.skeleton not in self.skeletons:
self.skeletons.append(inst.skeleton)
if inst.track is not None and inst.track not in self.tracks:
self.tracks.append(inst.track)
clean(frames=True, empty_instances=False, skeletons=True, tracks=True, videos=False)
¶
Remove empty frames, unused skeletons, tracks and videos.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frames |
bool
|
If |
True
|
empty_instances |
bool
|
If |
False
|
skeletons |
bool
|
If |
True
|
tracks |
bool
|
If |
True
|
videos |
bool
|
If |
False
|
Source code in sleap_io/model/labels.py
def clean(
self,
frames: bool = True,
empty_instances: bool = False,
skeletons: bool = True,
tracks: bool = True,
videos: bool = False,
):
"""Remove empty frames, unused skeletons, tracks and videos.
Args:
frames: If `True` (the default), remove empty frames.
empty_instances: If `True` (NOT default), remove instances that have no
visible points.
skeletons: If `True` (the default), remove unused skeletons.
tracks: If `True` (the default), remove unused tracks.
videos: If `True` (NOT default), remove videos that have no labeled frames.
"""
used_skeletons = []
used_tracks = []
used_videos = []
kept_frames = []
for lf in self.labeled_frames:
if empty_instances:
lf.remove_empty_instances()
if frames and len(lf) == 0:
continue
if videos and lf.video not in used_videos:
used_videos.append(lf.video)
if skeletons or tracks:
for inst in lf:
if skeletons and inst.skeleton not in used_skeletons:
used_skeletons.append(inst.skeleton)
if (
tracks
and inst.track is not None
and inst.track not in used_tracks
):
used_tracks.append(inst.track)
if frames:
kept_frames.append(lf)
if videos:
self.videos = [video for video in self.videos if video in used_videos]
if skeletons:
self.skeletons = [
skeleton for skeleton in self.skeletons if skeleton in used_skeletons
]
if tracks:
self.tracks = [track for track in self.tracks if track in used_tracks]
if frames:
self.labeled_frames = kept_frames
extend(lfs, update=True)
¶
Append a labeled frame to the labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lfs |
list[LabeledFrame]
|
A list of labeled frames to add to the labels. |
required |
update |
bool
|
If |
True
|
Source code in sleap_io/model/labels.py
def extend(self, lfs: list[LabeledFrame], update: bool = True):
"""Append a labeled frame to the labels.
Args:
lfs: A list of labeled frames to add to the labels.
update: If `True` (the default), update list of videos, tracks and
skeletons from the contents.
"""
self.labeled_frames.extend(lfs)
if update:
for lf in lfs:
if lf.video not in self.videos:
self.videos.append(lf.video)
for inst in lf:
if inst.skeleton not in self.skeletons:
self.skeletons.append(inst.skeleton)
if inst.track is not None and inst.track not in self.tracks:
self.tracks.append(inst.track)
find(video, frame_idx=None, return_new=False)
¶
Search for labeled frames given video and/or frame index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video |
Video
|
A |
required |
frame_idx |
int | list[int] | None
|
The frame index (or indices) which we want to find in the video. If a range is specified, we'll return all frames with indices in that range. If not specific, then we'll return all labeled frames for video. |
None
|
return_new |
bool
|
Whether to return singleton of new and empty |
False
|
Returns:
Type | Description |
---|---|
list[LabeledFrame]
|
List of The list will be empty if no matches found, unless return_new is True, in
which case it contains new (empty) |
Source code in sleap_io/model/labels.py
def find(
self,
video: Video,
frame_idx: int | list[int] | None = None,
return_new: bool = False,
) -> list[LabeledFrame]:
"""Search for labeled frames given video and/or frame index.
Args:
video: A `Video` that is associated with the project.
frame_idx: The frame index (or indices) which we want to find in the video.
If a range is specified, we'll return all frames with indices in that
range. If not specific, then we'll return all labeled frames for video.
return_new: Whether to return singleton of new and empty `LabeledFrame` if
none are found in project.
Returns:
List of `LabeledFrame` objects that match the criteria.
The list will be empty if no matches found, unless return_new is True, in
which case it contains new (empty) `LabeledFrame` objects with `video` and
`frame_index` set.
"""
results = []
if frame_idx is None:
for lf in self.labeled_frames:
if lf.video == video:
results.append(lf)
return results
if np.isscalar(frame_idx):
frame_idx = np.array(frame_idx).reshape(-1)
for frame_ind in frame_idx:
result = None
for lf in self.labeled_frames:
if lf.video == video and lf.frame_idx == frame_ind:
result = lf
results.append(result)
break
if result is None and return_new:
results.append(LabeledFrame(video=video, frame_idx=frame_ind))
return results
make_training_splits(n_train, n_val=None, n_test=None, save_dir=None, seed=None)
¶
Make splits for training with embedded images.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_train |
int | float
|
Size of the training split as integer or fraction. |
required |
n_val |
int | float | None
|
Size of the validation split as integer or fraction. If |
None
|
n_test |
int | float | None
|
Size of the testing split as integer or fraction. If |
None
|
save_dir |
str | Path | None
|
If specified, save splits to SLP files with embedded images. |
None
|
seed |
int | None
|
Optional integer seed to use for reproducibility. |
None
|
Returns:
Type | Description |
---|---|
tuple[Labels, Labels] | tuple[Labels, Labels, Labels]
|
A tuple of |
Notes
Predictions and suggestions will be removed before saving, leaving only frames with user labeled data (the source labels are not affected).
Frames with user labeled data will be embedded in the resulting files.
If save_dir
is specified, this will save the randomly sampled splits to:
{save_dir}/train.pkg.slp
{save_dir}/val.pkg.slp
{save_dir}/test.pkg.slp
(ifn_test
is specified)
See also: Labels.split
Source code in sleap_io/model/labels.py
def make_training_splits(
self,
n_train: int | float,
n_val: int | float | None = None,
n_test: int | float | None = None,
save_dir: str | Path | None = None,
seed: int | None = None,
) -> tuple[Labels, Labels] | tuple[Labels, Labels, Labels]:
"""Make splits for training with embedded images.
Args:
n_train: Size of the training split as integer or fraction.
n_val: Size of the validation split as integer or fraction. If `None`,
this will be inferred based on the values of `n_train` and `n_test`. If
`n_test` is `None`, this will be the remainder of the data after the
training split.
n_test: Size of the testing split as integer or fraction. If `None`, the
test split will not be saved.
save_dir: If specified, save splits to SLP files with embedded images.
seed: Optional integer seed to use for reproducibility.
Returns:
A tuple of `labels_train, labels_val` or
`labels_train, labels_val, labels_test` if `n_test` was specified.
Notes:
Predictions and suggestions will be removed before saving, leaving only
frames with user labeled data (the source labels are not affected).
Frames with user labeled data will be embedded in the resulting files.
If `save_dir` is specified, this will save the randomly sampled splits to:
- `{save_dir}/train.pkg.slp`
- `{save_dir}/val.pkg.slp`
- `{save_dir}/test.pkg.slp` (if `n_test` is specified)
See also: `Labels.split`
"""
# Clean up labels.
labels = deepcopy(self)
labels.remove_predictions()
labels.suggestions = []
labels.clean()
# Make splits.
labels_train, labels_rest = labels.split(n_train, seed=seed)
if n_test is not None:
if n_test < 1:
n_test = (n_test * len(labels)) / len(labels_rest)
labels_test, labels_rest = labels_rest.split(n=n_test, seed=seed)
if n_val is not None:
if n_val < 1:
n_val = (n_val * len(labels)) / len(labels_rest)
labels_val, _ = labels_rest.split(n=n_val, seed=seed)
else:
labels_val = labels_rest
# Save.
if save_dir is not None:
save_dir = Path(save_dir)
save_dir.mkdir(exist_ok=True, parents=True)
labels_train.save(save_dir / "train.pkg.slp", embed="user")
labels_val.save(save_dir / "val.pkg.slp", embed="user")
labels_test.save(save_dir / "test.pkg.slp", embed="user")
if n_test is None:
return labels_train, labels_val
else:
return labels_train, labels_val, labels_test
numpy(video=None, all_frames=True, untracked=False, return_confidence=False)
¶
Construct a numpy array from instance points.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
video |
Optional[Union[Video, int]]
|
Video or video index to convert to numpy arrays. If |
None
|
untracked |
bool
|
If |
False
|
return_confidence |
bool
|
If |
False
|
Returns:
Type | Description |
---|---|
ndarray
|
An array of tracks of shape Missing data will be replaced with If this is a single instance project, a track does not need to be assigned. Only predicted instances (NOT user instances) will be returned. |
Notes
This method assumes that instances have tracks assigned and is intended to function primarily for single-video prediction results.
Source code in sleap_io/model/labels.py
def numpy(
self,
video: Optional[Union[Video, int]] = None,
all_frames: bool = True,
untracked: bool = False,
return_confidence: bool = False,
) -> np.ndarray:
"""Construct a numpy array from instance points.
Args:
video: Video or video index to convert to numpy arrays. If `None` (the
default), uses the first video.
untracked: If `False` (the default), include only instances that have a
track assignment. If `True`, includes all instances in each frame in
arbitrary order.
return_confidence: If `False` (the default), only return points of nodes. If
`True`, return the points and scores of nodes.
Returns:
An array of tracks of shape `(n_frames, n_tracks, n_nodes, 2)` if
`return_confidence` is `False`. Otherwise returned shape is
`(n_frames, n_tracks, n_nodes, 3)` if `return_confidence` is `True`.
Missing data will be replaced with `np.nan`.
If this is a single instance project, a track does not need to be assigned.
Only predicted instances (NOT user instances) will be returned.
Notes:
This method assumes that instances have tracks assigned and is intended to
function primarily for single-video prediction results.
"""
# Get labeled frames for specified video.
if video is None:
video = 0
if type(video) == int:
video = self.videos[video]
lfs = [lf for lf in self.labeled_frames if lf.video == video]
# Figure out frame index range.
first_frame, last_frame = 0, 0
for lf in lfs:
first_frame = min(first_frame, lf.frame_idx)
last_frame = max(last_frame, lf.frame_idx)
# Figure out the number of tracks based on number of instances in each frame.
# First, let's check the max number of predicted instances (regardless of
# whether they're tracked.
n_preds = 0
for lf in lfs:
n_pred_instances = len(lf.predicted_instances)
n_preds = max(n_preds, n_pred_instances)
# Case 1: We don't care about order because there's only 1 instance per frame,
# or we're considering untracked instances.
untracked = untracked or n_preds == 1
if untracked:
n_tracks = n_preds
else:
# Case 2: We're considering only tracked instances.
n_tracks = len(self.tracks)
n_frames = int(last_frame - first_frame + 1)
skeleton = self.skeletons[-1] # Assume project only uses last skeleton
n_nodes = len(skeleton.nodes)
if return_confidence:
tracks = np.full((n_frames, n_tracks, n_nodes, 3), np.nan, dtype="float32")
else:
tracks = np.full((n_frames, n_tracks, n_nodes, 2), np.nan, dtype="float32")
for lf in lfs:
i = int(lf.frame_idx - first_frame)
if untracked:
for j, inst in enumerate(lf.predicted_instances):
tracks[i, j] = inst.numpy(scores=return_confidence)
else:
tracked_instances = [
inst
for inst in lf.instances
if type(inst) == PredictedInstance and inst.track is not None
]
for inst in tracked_instances:
j = self.tracks.index(inst.track) # type: ignore[arg-type]
tracks[i, j] = inst.numpy(scores=return_confidence)
return tracks
remove_predictions(clean=True)
¶
Remove all predicted instances from the labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
clean |
bool
|
If |
True
|
See also: Labels.clean
Source code in sleap_io/model/labels.py
def remove_predictions(self, clean: bool = True):
"""Remove all predicted instances from the labels.
Args:
clean: If `True` (the default), also remove any empty frames and unused
tracks and skeletons. It does NOT remove videos that have no labeled
frames or instances with no visible points.
See also: `Labels.clean`
"""
for lf in self.labeled_frames:
lf.remove_predictions()
if clean:
self.clean(
frames=True,
empty_instances=False,
skeletons=True,
tracks=True,
videos=False,
)
replace_filenames(new_filenames=None, filename_map=None, prefix_map=None)
¶
Replace video filenames.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_filenames |
list[str | Path] | None
|
List of new filenames. Must have the same length as the number of videos in the labels. |
None
|
filename_map |
dict[str | Path, str | Path] | None
|
Dictionary mapping old filenames (keys) to new filenames (values). |
None
|
prefix_map |
dict[str | Path, str | Path] | None
|
Dictonary mapping old prefixes (keys) to new prefixes (values). |
None
|
Notes
Only one of the argument types can be provided.
Source code in sleap_io/model/labels.py
def replace_filenames(
self,
new_filenames: list[str | Path] | None = None,
filename_map: dict[str | Path, str | Path] | None = None,
prefix_map: dict[str | Path, str | Path] | None = None,
):
"""Replace video filenames.
Args:
new_filenames: List of new filenames. Must have the same length as the
number of videos in the labels.
filename_map: Dictionary mapping old filenames (keys) to new filenames
(values).
prefix_map: Dictonary mapping old prefixes (keys) to new prefixes (values).
Notes:
Only one of the argument types can be provided.
"""
n = 0
if new_filenames is not None:
n += 1
if filename_map is not None:
n += 1
if prefix_map is not None:
n += 1
if n != 1:
raise ValueError(
"Exactly one input method must be provided to replace filenames."
)
if new_filenames is not None:
if len(self.videos) != len(new_filenames):
raise ValueError(
f"Number of new filenames ({len(new_filenames)}) does not match "
f"the number of videos ({len(self.videos)})."
)
for video, new_filename in zip(self.videos, new_filenames):
video.replace_filename(new_filename)
elif filename_map is not None:
for video in self.videos:
for old_fn, new_fn in filename_map.items():
if type(video.filename) == list:
new_fns = []
for fn in video.filename:
if Path(fn) == Path(old_fn):
new_fns.append(new_fn)
else:
new_fns.append(fn)
video.replace_filename(new_fns)
else:
if Path(video.filename) == Path(old_fn):
video.replace_filename(new_fn)
elif prefix_map is not None:
for video in self.videos:
for old_prefix, new_prefix in prefix_map.items():
old_prefix, new_prefix = Path(old_prefix), Path(new_prefix)
if type(video.filename) == list:
new_fns = []
for fn in video.filename:
fn = Path(fn)
if fn.as_posix().startswith(old_prefix.as_posix()):
new_fns.append(new_prefix / fn.relative_to(old_prefix))
else:
new_fns.append(fn)
video.replace_filename(new_fns)
else:
fn = Path(video.filename)
if fn.as_posix().startswith(old_prefix.as_posix()):
video.replace_filename(
new_prefix / fn.relative_to(old_prefix)
)
replace_videos(old_videos=None, new_videos=None, video_map=None)
¶
Replace videos and update all references.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
old_videos |
list[Video] | None
|
List of videos to be replaced. |
None
|
new_videos |
list[Video] | None
|
List of videos to replace with. |
None
|
video_map |
dict[Video, Video] | None
|
Alternative input of dictionary where keys are the old videos and values are the new videos. |
None
|
Source code in sleap_io/model/labels.py
def replace_videos(
self,
old_videos: list[Video] | None = None,
new_videos: list[Video] | None = None,
video_map: dict[Video, Video] | None = None,
):
"""Replace videos and update all references.
Args:
old_videos: List of videos to be replaced.
new_videos: List of videos to replace with.
video_map: Alternative input of dictionary where keys are the old videos and
values are the new videos.
"""
if video_map is None:
video_map = {o: n for o, n in zip(old_videos, new_videos)}
# Update the labeled frames with the new videos.
for lf in self.labeled_frames:
if lf.video in video_map:
lf.video = video_map[lf.video]
# Update suggestions with the new videos.
for sf in self.suggestions:
if sf.video in video_map:
sf.video = video_map[sf.video]
save(filename, format=None, embed=None, **kwargs)
¶
Save labels to file in specified format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str
|
Path to save labels to. |
required |
format |
Optional[str]
|
The format to save the labels in. If |
None
|
embed |
bool | str | list[tuple[Video, int]] | None
|
Frames to embed in the saved labels file. One of If If If This argument is only valid for the SLP backend. |
None
|
Source code in sleap_io/model/labels.py
def save(
self,
filename: str,
format: Optional[str] = None,
embed: bool | str | list[tuple[Video, int]] | None = None,
**kwargs,
):
"""Save labels to file in specified format.
Args:
filename: Path to save labels to.
format: The format to save the labels in. If `None`, the format will be
inferred from the file extension. Available formats are `"slp"`,
`"nwb"`, `"labelstudio"`, and `"jabs"`.
embed: Frames to embed in the saved labels file. One of `None`, `True`,
`"all"`, `"user"`, `"suggestions"`, `"user+suggestions"`, `"source"` or
list of tuples of `(video, frame_idx)`.
If `None` is specified (the default) and the labels contains embedded
frames, those embedded frames will be re-saved to the new file.
If `True` or `"all"`, all labeled frames and suggested frames will be
embedded.
If `"source"` is specified, no images will be embedded and the source
video will be restored if available.
This argument is only valid for the SLP backend.
"""
from sleap_io import save_file
save_file(self, filename, format=format, embed=embed, **kwargs)
split(n, seed=None)
¶
Separate the labels into random splits.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n |
int | float
|
Size of the first split. If integer >= 1, assumes that this is the number of labeled frames in the first split. If < 1.0, this will be treated as a fraction of the total labeled frames. |
required |
seed |
int | None
|
Optional integer seed to use for reproducibility. |
None
|
Returns:
Type | Description |
---|---|
tuple[Labels, Labels]
|
A tuple of If an integer was specified, If a fraction was specified, The second split contains the remainder, i.e.,
If there are too few frames, a minimum of 1 frame will be kept in the second split. If there is exactly 1 labeled frame in the labels, the same frame will be assigned to both splits. |
Source code in sleap_io/model/labels.py
def split(self, n: int | float, seed: int | None = None) -> tuple[Labels, Labels]:
"""Separate the labels into random splits.
Args:
n: Size of the first split. If integer >= 1, assumes that this is the number
of labeled frames in the first split. If < 1.0, this will be treated as
a fraction of the total labeled frames.
seed: Optional integer seed to use for reproducibility.
Returns:
A tuple of `split1, split2`.
If an integer was specified, `len(split1) == n`.
If a fraction was specified, `len(split1) == int(n * len(labels))`.
The second split contains the remainder, i.e.,
`len(split2) == len(labels) - len(split1)`.
If there are too few frames, a minimum of 1 frame will be kept in the second
split.
If there is exactly 1 labeled frame in the labels, the same frame will be
assigned to both splits.
"""
n0 = len(self)
if n0 == 0:
return self, self
n1 = n
if n < 1.0:
n1 = max(int(n0 * float(n)), 1)
n2 = max(n0 - n1, 1)
n1, n2 = int(n1), int(n2)
rng = np.random.default_rng(seed=seed)
inds1 = rng.choice(n0, size=(n1,), replace=False)
if n0 == 1:
inds2 = np.array([0])
else:
inds2 = np.setdiff1d(np.arange(n0), inds1)
split1, split2 = self[inds1], self[inds2]
split1, split2 = deepcopy(split1), deepcopy(split2)
split1, split2 = Labels(split1), Labels(split2)
split1.provenance = self.provenance
split2.provenance = self.provenance
split1.provenance["source_labels"] = self.provenance.get("filename", None)
split2.provenance["source_labels"] = self.provenance.get("filename", None)
return split1, split2
update()
¶
Update data structures based on contents.
This function will update the list of skeletons, videos and tracks from the labeled frames, instances and suggestions.
Source code in sleap_io/model/labels.py
def update(self):
"""Update data structures based on contents.
This function will update the list of skeletons, videos and tracks from the
labeled frames, instances and suggestions.
"""
for lf in self.labeled_frames:
if lf.video not in self.videos:
self.videos.append(lf.video)
for inst in lf:
if inst.skeleton not in self.skeletons:
self.skeletons.append(inst.skeleton)
if inst.track is not None and inst.track not in self.tracks:
self.tracks.append(inst.track)
for sf in self.suggestions:
if sf.video not in self.videos:
self.videos.append(sf.video)
sleap_io.LabeledFrame
¶
Labeled data for a single frame of a video.
Attributes:
Name | Type | Description |
---|---|---|
video |
Video
|
The |
frame_idx |
int
|
The index of the |
instances |
list[Union[Instance, PredictedInstance]]
|
List of |
Notes
Instances of this class are hashed by identity, not by value. This means that
two LabeledFrame
instances with the same attributes will NOT be considered
equal in a set or dict.
Source code in sleap_io/model/labeled_frame.py
@define(eq=False)
class LabeledFrame:
"""Labeled data for a single frame of a video.
Attributes:
video: The `Video` associated with this `LabeledFrame`.
frame_idx: The index of the `LabeledFrame` in the `Video`.
instances: List of `Instance` objects associated with this `LabeledFrame`.
Notes:
Instances of this class are hashed by identity, not by value. This means that
two `LabeledFrame` instances with the same attributes will NOT be considered
equal in a set or dict.
"""
video: Video
frame_idx: int
instances: list[Union[Instance, PredictedInstance]] = field(factory=list)
def __len__(self) -> int:
"""Return the number of instances in the frame."""
return len(self.instances)
def __getitem__(self, key: int) -> Union[Instance, PredictedInstance]:
"""Return the `Instance` at `key` index in the `instances` list."""
return self.instances[key]
def __iter__(self):
"""Iterate over `Instance`s in `instances` list."""
return iter(self.instances)
@property
def user_instances(self) -> list[Instance]:
"""Frame instances that are user-labeled (`Instance` objects)."""
return [inst for inst in self.instances if type(inst) == Instance]
@property
def has_user_instances(self) -> bool:
"""Return True if the frame has any user-labeled instances."""
for inst in self.instances:
if type(inst) == Instance:
return True
return False
@property
def predicted_instances(self) -> list[Instance]:
"""Frame instances that are predicted by a model (`PredictedInstance` objects)."""
return [inst for inst in self.instances if type(inst) == PredictedInstance]
@property
def has_predicted_instances(self) -> bool:
"""Return True if the frame has any predicted instances."""
for inst in self.instances:
if type(inst) == PredictedInstance:
return True
return False
def numpy(self) -> np.ndarray:
"""Return all instances in the frame as a numpy array.
Returns:
Points as a numpy array of shape `(n_instances, n_nodes, 2)`.
Note that the order of the instances is arbitrary.
"""
n_instances = len(self.instances)
n_nodes = len(self.instances[0]) if n_instances > 0 else 0
pts = np.full((n_instances, n_nodes, 2), np.nan)
for i, inst in enumerate(self.instances):
pts[i] = inst.numpy()[:, 0:2]
return pts
@property
def image(self) -> np.ndarray:
"""Return the image of the frame as a numpy array."""
return self.video[self.frame_idx]
@property
def unused_predictions(self) -> list[Instance]:
"""Return a list of "unused" `PredictedInstance` objects in frame.
This is all of the `PredictedInstance` objects which do not have a corresponding
`Instance` in the same track in the same frame.
"""
unused_predictions = []
any_tracks = [inst.track for inst in self.instances if inst.track is not None]
if len(any_tracks):
# Use tracks to determine which predicted instances have been used
used_tracks = [
inst.track
for inst in self.instances
if type(inst) == Instance and inst.track is not None
]
unused_predictions = [
inst
for inst in self.instances
if inst.track not in used_tracks and type(inst) == PredictedInstance
]
else:
# Use from_predicted to determine which predicted instances have been used
# TODO: should we always do this instead of using tracks?
used_instances = [
inst.from_predicted
for inst in self.instances
if inst.from_predicted is not None
]
unused_predictions = [
inst
for inst in self.instances
if type(inst) == PredictedInstance and inst not in used_instances
]
return unused_predictions
def remove_predictions(self):
"""Remove all `PredictedInstance` objects from the frame."""
self.instances = [inst for inst in self.instances if type(inst) == Instance]
def remove_empty_instances(self):
"""Remove all instances with no visible points."""
self.instances = [inst for inst in self.instances if not inst.is_empty]
has_predicted_instances: bool
property
¶
Return True if the frame has any predicted instances.
has_user_instances: bool
property
¶
Return True if the frame has any user-labeled instances.
image: np.ndarray
property
¶
Return the image of the frame as a numpy array.
predicted_instances: list[Instance]
property
¶
Frame instances that are predicted by a model (PredictedInstance
objects).
unused_predictions: list[Instance]
property
¶
Return a list of "unused" PredictedInstance
objects in frame.
This is all of the PredictedInstance
objects which do not have a corresponding
Instance
in the same track in the same frame.
user_instances: list[Instance]
property
¶
Frame instances that are user-labeled (Instance
objects).
__getitem__(key)
¶
__iter__()
¶
__len__()
¶
numpy()
¶
Return all instances in the frame as a numpy array.
Returns:
Type | Description |
---|---|
ndarray
|
Points as a numpy array of shape Note that the order of the instances is arbitrary. |
Source code in sleap_io/model/labeled_frame.py
def numpy(self) -> np.ndarray:
"""Return all instances in the frame as a numpy array.
Returns:
Points as a numpy array of shape `(n_instances, n_nodes, 2)`.
Note that the order of the instances is arbitrary.
"""
n_instances = len(self.instances)
n_nodes = len(self.instances[0]) if n_instances > 0 else 0
pts = np.full((n_instances, n_nodes, 2), np.nan)
for i, inst in enumerate(self.instances):
pts[i] = inst.numpy()[:, 0:2]
return pts
remove_empty_instances()
¶
remove_predictions()
¶
sleap_io.Instance
¶
This class represents a ground truth instance such as an animal.
An Instance
has a set of landmarks (Point
s) that correspond to the nodes defined
in its Skeleton
.
It may also be associated with a Track
which links multiple instances together
across frames or videos.
Attributes:
Name | Type | Description |
---|---|---|
points |
Union[dict[Node, Point], dict[Node, PredictedPoint]]
|
A dictionary with keys as |
skeleton |
Skeleton
|
The |
track |
Optional[Track]
|
An optional |
from_predicted |
Optional[PredictedInstance]
|
The |
Source code in sleap_io/model/instance.py
@define(auto_attribs=True, slots=True, eq=True)
class Instance:
"""This class represents a ground truth instance such as an animal.
An `Instance` has a set of landmarks (`Point`s) that correspond to the nodes defined
in its `Skeleton`.
It may also be associated with a `Track` which links multiple instances together
across frames or videos.
Attributes:
points: A dictionary with keys as `Node`s and values as `Point`s containing all
of the landmarks of the instance. This can also be specified as a dictionary
with node names, a list of length `n_nodes`, or a numpy array of shape
`(n_nodes, 2)`.
skeleton: The `Skeleton` that describes the `Node`s and `Edge`s associated with
this instance.
track: An optional `Track` associated with a unique animal/object across frames
or videos.
from_predicted: The `PredictedInstance` (if any) that this instance was
initialized from. This is used with human-in-the-loop workflows.
"""
_POINT_TYPE = Point
def _make_default_point(self, x, y):
return self._POINT_TYPE(x, y, visible=not (math.isnan(x) or math.isnan(y)))
def _convert_points(self, attr, points):
"""Maintain points mappings between nodes and points."""
if type(points) == np.ndarray:
points = points.tolist()
if type(points) == list:
if len(points) != len(self.skeleton):
raise ValueError(
"If specifying points as a list, must provide as many points as "
"nodes in the skeleton."
)
points = {node: pt for node, pt in zip(self.skeleton.nodes, points)}
if type(points) == dict:
keys = [
node if type(node) == Node else self.skeleton[node]
for node in points.keys()
]
vals = [
(
point
if type(point) == self._POINT_TYPE
else self._make_default_point(*point)
)
for point in points.values()
]
points = {k: v for k, v in zip(keys, vals)}
missing_nodes = list(set(self.skeleton.nodes) - set(points.keys()))
for node in missing_nodes:
points[node] = self._make_default_point(x=np.nan, y=np.nan)
return points
points: Union[dict[Node, Point], dict[Node, PredictedPoint]] = field(
on_setattr=_convert_points, eq=cmp_using(eq=_compare_points) # type: ignore
)
skeleton: Skeleton
track: Optional[Track] = None
from_predicted: Optional[PredictedInstance] = None
def __attrs_post_init__(self):
"""Maintain point mappings between node and points after initialization."""
super().__setattr__("points", self._convert_points(None, self.points))
def __getitem__(self, node: Union[int, str, Node]) -> Optional[Point]:
"""Return the point associated with a node or `None` if not set."""
if (type(node) == int) or (type(node) == str):
node = self.skeleton[node]
if isinstance(node, Node):
return self.points.get(node, None)
else:
raise IndexError(f"Invalid indexing argument for instance: {node}")
def __len__(self) -> int:
"""Return the number of points in the instance."""
return len(self.points)
def __repr__(self) -> str:
"""Return a readable representation of the instance."""
pts = self.numpy().tolist()
track = f'"{self.track.name}"' if self.track is not None else self.track
return f"Instance(points={pts}, track={track})"
@property
def n_visible(self) -> int:
"""Return the number of visible points in the instance."""
return sum(pt.visible for pt in self.points.values())
@property
def is_empty(self) -> bool:
"""Return `True` if no points are visible on the instance."""
return self.n_visible == 0
@classmethod
def from_numpy(
cls, points: np.ndarray, skeleton: Skeleton, track: Optional[Track] = None
) -> "Instance":
"""Create an instance object from a numpy array.
Args:
points: A numpy array of shape `(n_nodes, 2)` corresponding to the points of
the skeleton. Values of `np.nan` indicate "missing" nodes.
skeleton: The `Skeleton` that this `Instance` is associated with. It should
have `n_nodes` nodes.
track: An optional `Track` associated with a unique animal/object across
frames or videos.
"""
return cls(
points=points, skeleton=skeleton, track=track # type: ignore[arg-type]
)
def numpy(self) -> np.ndarray:
"""Return the instance points as a numpy array."""
pts = np.full((len(self.skeleton), 2), np.nan)
for node, point in self.points.items():
if point.visible:
pts[self.skeleton.index(node)] = point.numpy()
return pts
is_empty: bool
property
¶
Return True
if no points are visible on the instance.
n_visible: int
property
¶
Return the number of visible points in the instance.
__attrs_post_init__()
¶
__getitem__(node)
¶
Return the point associated with a node or None
if not set.
Source code in sleap_io/model/instance.py
def __getitem__(self, node: Union[int, str, Node]) -> Optional[Point]:
"""Return the point associated with a node or `None` if not set."""
if (type(node) == int) or (type(node) == str):
node = self.skeleton[node]
if isinstance(node, Node):
return self.points.get(node, None)
else:
raise IndexError(f"Invalid indexing argument for instance: {node}")
__len__()
¶
__repr__()
¶
Return a readable representation of the instance.
from_numpy(points, skeleton, track=None)
classmethod
¶
Create an instance object from a numpy array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
points |
ndarray
|
A numpy array of shape |
required |
skeleton |
Skeleton
|
The |
required |
track |
Optional[Track]
|
An optional |
None
|
Source code in sleap_io/model/instance.py
@classmethod
def from_numpy(
cls, points: np.ndarray, skeleton: Skeleton, track: Optional[Track] = None
) -> "Instance":
"""Create an instance object from a numpy array.
Args:
points: A numpy array of shape `(n_nodes, 2)` corresponding to the points of
the skeleton. Values of `np.nan` indicate "missing" nodes.
skeleton: The `Skeleton` that this `Instance` is associated with. It should
have `n_nodes` nodes.
track: An optional `Track` associated with a unique animal/object across
frames or videos.
"""
return cls(
points=points, skeleton=skeleton, track=track # type: ignore[arg-type]
)
numpy()
¶
Return the instance points as a numpy array.
sleap_io.PredictedInstance
¶
Bases: Instance
A PredictedInstance
is an Instance
that was predicted using a model.
Attributes:
Name | Type | Description |
---|---|---|
skeleton |
The |
|
points |
A dictionary where keys are |
|
track |
An optional |
|
from_predicted |
Optional[PredictedInstance]
|
Not applicable in |
score |
float
|
The instance detection or part grouping prediction score. This is a scalar that represents the confidence with which this entire instance was predicted. This may not always be applicable depending on the model type. |
tracking_score |
Optional[float]
|
The score associated with the |
Source code in sleap_io/model/instance.py
@define
class PredictedInstance(Instance):
"""A `PredictedInstance` is an `Instance` that was predicted using a model.
Attributes:
skeleton: The `Skeleton` that this `Instance` is associated with.
points: A dictionary where keys are `Skeleton` nodes and values are `Point`s.
track: An optional `Track` associated with a unique animal/object across frames
or videos.
from_predicted: Not applicable in `PredictedInstance`s (must be set to `None`).
score: The instance detection or part grouping prediction score. This is a
scalar that represents the confidence with which this entire instance was
predicted. This may not always be applicable depending on the model type.
tracking_score: The score associated with the `Track` assignment. This is
typically the value from the score matrix used in an identity assignment.
"""
_POINT_TYPE = PredictedPoint
from_predicted: Optional[PredictedInstance] = field(
default=None, validator=validators.instance_of(type(None))
)
score: float = 0.0
tracking_score: Optional[float] = 0
def __repr__(self) -> str:
"""Return a readable representation of the instance."""
pts = self.numpy().tolist()
track = f'"{self.track.name}"' if self.track is not None else self.track
score = str(self.score) if self.score is None else f"{self.score:.2f}"
tracking_score = (
str(self.tracking_score)
if self.tracking_score is None
else f"{self.tracking_score:.2f}"
)
return (
f"PredictedInstance(points={pts}, track={track}, "
f"score={score}, tracking_score={tracking_score})"
)
@classmethod
def from_numpy( # type: ignore[override]
cls,
points: np.ndarray,
point_scores: np.ndarray,
instance_score: float,
skeleton: Skeleton,
tracking_score: Optional[float] = None,
track: Optional[Track] = None,
) -> "PredictedInstance":
"""Create an instance object from a numpy array.
Args:
points: A numpy array of shape `(n_nodes, 2)` corresponding to the points of
the skeleton. Values of `np.nan` indicate "missing" nodes.
point_scores: The points-level prediction score. This is an array that
represents the confidence with which each point in the instance was
predicted. This may not always be applicable depending on the model
type.
instance_score: The instance detection or part grouping prediction score.
This is a scalar that represents the confidence with which this entire
instance was predicted. This may not always be applicable depending on
the model type.
skeleton: The `Skeleton` that this `Instance` is associated with. It should
have `n_nodes` nodes.
tracking_score: The score associated with the `Track` assignment. This is
typically the value from the score matrix used in an identity
assignment.
track: An optional `Track` associated with a unique animal/object across
frames or videos.
"""
node_points = {
node: PredictedPoint(pt[0], pt[1], score=score)
for node, pt, score in zip(skeleton.nodes, points, point_scores)
}
return cls(
points=node_points,
skeleton=skeleton,
score=instance_score,
tracking_score=tracking_score,
track=track,
)
def numpy(self, scores: bool = False) -> np.ndarray:
"""Return the instance points as a numpy array."""
pts = np.full((len(self.skeleton), 3), np.nan)
for node, point in self.points.items():
if point.visible:
pts[self.skeleton.index(node)] = point.numpy()
if not scores:
pts = pts[:, :2]
return pts
__repr__()
¶
Return a readable representation of the instance.
Source code in sleap_io/model/instance.py
def __repr__(self) -> str:
"""Return a readable representation of the instance."""
pts = self.numpy().tolist()
track = f'"{self.track.name}"' if self.track is not None else self.track
score = str(self.score) if self.score is None else f"{self.score:.2f}"
tracking_score = (
str(self.tracking_score)
if self.tracking_score is None
else f"{self.tracking_score:.2f}"
)
return (
f"PredictedInstance(points={pts}, track={track}, "
f"score={score}, tracking_score={tracking_score})"
)
from_numpy(points, point_scores, instance_score, skeleton, tracking_score=None, track=None)
classmethod
¶
Create an instance object from a numpy array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
points |
ndarray
|
A numpy array of shape |
required |
point_scores |
ndarray
|
The points-level prediction score. This is an array that represents the confidence with which each point in the instance was predicted. This may not always be applicable depending on the model type. |
required |
instance_score |
float
|
The instance detection or part grouping prediction score. This is a scalar that represents the confidence with which this entire instance was predicted. This may not always be applicable depending on the model type. |
required |
skeleton |
Skeleton
|
The |
required |
tracking_score |
Optional[float]
|
The score associated with the |
None
|
track |
Optional[Track]
|
An optional |
None
|
Source code in sleap_io/model/instance.py
@classmethod
def from_numpy( # type: ignore[override]
cls,
points: np.ndarray,
point_scores: np.ndarray,
instance_score: float,
skeleton: Skeleton,
tracking_score: Optional[float] = None,
track: Optional[Track] = None,
) -> "PredictedInstance":
"""Create an instance object from a numpy array.
Args:
points: A numpy array of shape `(n_nodes, 2)` corresponding to the points of
the skeleton. Values of `np.nan` indicate "missing" nodes.
point_scores: The points-level prediction score. This is an array that
represents the confidence with which each point in the instance was
predicted. This may not always be applicable depending on the model
type.
instance_score: The instance detection or part grouping prediction score.
This is a scalar that represents the confidence with which this entire
instance was predicted. This may not always be applicable depending on
the model type.
skeleton: The `Skeleton` that this `Instance` is associated with. It should
have `n_nodes` nodes.
tracking_score: The score associated with the `Track` assignment. This is
typically the value from the score matrix used in an identity
assignment.
track: An optional `Track` associated with a unique animal/object across
frames or videos.
"""
node_points = {
node: PredictedPoint(pt[0], pt[1], score=score)
for node, pt, score in zip(skeleton.nodes, points, point_scores)
}
return cls(
points=node_points,
skeleton=skeleton,
score=instance_score,
tracking_score=tracking_score,
track=track,
)
numpy(scores=False)
¶
Return the instance points as a numpy array.
Source code in sleap_io/model/instance.py
sleap_io.Point
¶
A 2D spatial landmark and metadata associated with annotation.
Attributes:
Name | Type | Description |
---|---|---|
x |
float
|
The horizontal pixel location of point in image coordinates. |
y |
float
|
The vertical pixel location of point in image coordinates. |
visible |
bool
|
Whether point is visible in the image or not. |
complete |
bool
|
Has the point been verified by the user labeler. |
Class variables
eq_atol: Controls absolute tolerence allowed in x
and y
when comparing two
Point
s for equality.
eq_rtol: Controls relative tolerence allowed in x
and y
when comparing two
Point
s for equality.
Source code in sleap_io/model/instance.py
@define
class Point:
"""A 2D spatial landmark and metadata associated with annotation.
Attributes:
x: The horizontal pixel location of point in image coordinates.
y: The vertical pixel location of point in image coordinates.
visible: Whether point is visible in the image or not.
complete: Has the point been verified by the user labeler.
Class variables:
eq_atol: Controls absolute tolerence allowed in `x` and `y` when comparing two
`Point`s for equality.
eq_rtol: Controls relative tolerence allowed in `x` and `y` when comparing two
`Point`s for equality.
"""
eq_atol: ClassVar[float] = 1e-08
eq_rtol: ClassVar[float] = 0
x: float
y: float
visible: bool = True
complete: bool = False
def __eq__(self, other: object) -> bool:
"""Compare `self` and `other` for equality.
Precision error between the respective `x` and `y` properties of two
instances may be allowed or controlled via the `Point.eq_atol` and
`Point.eq_rtol` class variables. Set to zero to disable their effect.
Internally, `numpy.isclose()` is used for the comparison:
https://numpy.org/doc/stable/reference/generated/numpy.isclose.html
Args:
other: Instance of `Point` to compare to.
Returns:
Returns True if all attributes of `self` and `other` are the identical
(possibly allowing precision error for `x` and `y` attributes).
"""
# Check that other is a Point.
if type(other) is not type(self):
return False
# We know that we have some kind of point at this point.
other = cast(Point, other)
return bool(
np.all(
np.isclose(
[self.x, self.y],
[other.x, other.y],
rtol=Point.eq_rtol,
atol=Point.eq_atol,
equal_nan=True,
)
)
and (self.visible == other.visible)
and (self.complete == other.complete)
)
def numpy(self) -> np.ndarray:
"""Return the coordinates as a numpy array of shape `(2,)`."""
return np.array([self.x, self.y]) if self.visible else np.full((2,), np.nan)
__eq__(other)
¶
Compare self
and other
for equality.
Precision error between the respective x
and y
properties of two
instances may be allowed or controlled via the Point.eq_atol
and
Point.eq_rtol
class variables. Set to zero to disable their effect.
Internally, numpy.isclose()
is used for the comparison:
https://numpy.org/doc/stable/reference/generated/numpy.isclose.html
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object
|
Instance of |
required |
Returns:
Type | Description |
---|---|
bool
|
Returns True if all attributes of |
Source code in sleap_io/model/instance.py
def __eq__(self, other: object) -> bool:
"""Compare `self` and `other` for equality.
Precision error between the respective `x` and `y` properties of two
instances may be allowed or controlled via the `Point.eq_atol` and
`Point.eq_rtol` class variables. Set to zero to disable their effect.
Internally, `numpy.isclose()` is used for the comparison:
https://numpy.org/doc/stable/reference/generated/numpy.isclose.html
Args:
other: Instance of `Point` to compare to.
Returns:
Returns True if all attributes of `self` and `other` are the identical
(possibly allowing precision error for `x` and `y` attributes).
"""
# Check that other is a Point.
if type(other) is not type(self):
return False
# We know that we have some kind of point at this point.
other = cast(Point, other)
return bool(
np.all(
np.isclose(
[self.x, self.y],
[other.x, other.y],
rtol=Point.eq_rtol,
atol=Point.eq_atol,
equal_nan=True,
)
)
and (self.visible == other.visible)
and (self.complete == other.complete)
)
numpy()
¶
sleap_io.PredictedPoint
¶
Bases: Point
A predicted point with associated score generated by a prediction model.
It has all the properties of a labeled Point
, plus a score
.
Attributes:
Name | Type | Description |
---|---|---|
x |
The horizontal pixel location of point within image frame. |
|
y |
The vertical pixel location of point within image frame. |
|
visible |
Whether point is visible in the image or not. |
|
complete |
Has the point been verified by the user labeler. |
|
score |
float
|
The point-level prediction score. This is typically the confidence and set to a value between 0 and 1. |
Source code in sleap_io/model/instance.py
@define
class PredictedPoint(Point):
"""A predicted point with associated score generated by a prediction model.
It has all the properties of a labeled `Point`, plus a `score`.
Attributes:
x: The horizontal pixel location of point within image frame.
y: The vertical pixel location of point within image frame.
visible: Whether point is visible in the image or not.
complete: Has the point been verified by the user labeler.
score: The point-level prediction score. This is typically the confidence and
set to a value between 0 and 1.
"""
score: float = 0.0
def numpy(self) -> np.ndarray:
"""Return the coordinates and score as a numpy array of shape `(3,)`."""
return (
np.array([self.x, self.y, self.score])
if self.visible
else np.full((3,), np.nan)
)
def __eq__(self, other: object) -> bool:
"""Compare `self` and `other` for equality.
See `Point.__eq__()` for important notes about point equality semantics!
Args:
other: Instance of `PredictedPoint` to compare
Returns:
Returns True if all attributes of `self` and `other` are the identical
(possibly allowing precision error for `x` and `y` attributes).
"""
if not super().__eq__(other):
return False
# we know that we have a point at this point
other = cast(PredictedPoint, other)
return self.score == other.score
__eq__(other)
¶
Compare self
and other
for equality.
See Point.__eq__()
for important notes about point equality semantics!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object
|
Instance of |
required |
Returns:
Type | Description |
---|---|
bool
|
Returns True if all attributes of |
Source code in sleap_io/model/instance.py
def __eq__(self, other: object) -> bool:
"""Compare `self` and `other` for equality.
See `Point.__eq__()` for important notes about point equality semantics!
Args:
other: Instance of `PredictedPoint` to compare
Returns:
Returns True if all attributes of `self` and `other` are the identical
(possibly allowing precision error for `x` and `y` attributes).
"""
if not super().__eq__(other):
return False
# we know that we have a point at this point
other = cast(PredictedPoint, other)
return self.score == other.score
numpy()
¶
Return the coordinates and score as a numpy array of shape (3,)
.
sleap_io.Skeleton
¶
A description of a set of landmark types and connections between them.
Skeletons are represented by a directed graph composed of a set of Node
s (landmark
types such as body parts) and Edge
s (connections between parts).
Attributes:
Name | Type | Description |
---|---|---|
nodes |
list[Node]
|
A list of |
edges |
list[Edge]
|
A list of |
symmetries |
list[Symmetry]
|
A list of |
name |
Optional[str]
|
A descriptive name for the |
Source code in sleap_io/model/skeleton.py
@define
class Skeleton:
"""A description of a set of landmark types and connections between them.
Skeletons are represented by a directed graph composed of a set of `Node`s (landmark
types such as body parts) and `Edge`s (connections between parts).
Attributes:
nodes: A list of `Node`s. May be specified as a list of strings to create new
nodes from their names.
edges: A list of `Edge`s. May be specified as a list of 2-tuples of string names
or integer indices of `nodes`. Each edge corresponds to a pair of source and
destination nodes forming a directed edge.
symmetries: A list of `Symmetry`s. Each symmetry corresponds to symmetric body
parts, such as `"left eye", "right eye"`. This is used when applying flip
(reflection) augmentation to images in order to appropriately swap the
indices of symmetric landmarks.
name: A descriptive name for the `Skeleton`.
"""
def _update_node_map(self, attr, nodes):
"""Callback for maintaining node name/index to `Node` map."""
self._node_name_map = {node.name: node for node in nodes}
self._node_ind_map = {node: i for i, node in enumerate(nodes)}
nodes: list[Node] = field(factory=list, on_setattr=_update_node_map)
edges: list[Edge] = field(factory=list)
symmetries: list[Symmetry] = field(factory=list)
name: Optional[str] = None
_node_name_map: dict[str, Node] = field(init=False, repr=False, eq=False)
_node_ind_map: dict[Node, int] = field(init=False, repr=False, eq=False)
def __attrs_post_init__(self):
"""Ensure nodes are `Node`s, edges are `Edge`s, and `Node` map is updated."""
self._convert_nodes()
self._convert_edges()
self._update_node_map(None, self.nodes)
def _convert_nodes(self):
"""Convert nodes to `Node` objects if needed."""
if isinstance(self.nodes, np.ndarray):
object.__setattr__(self, "nodes", self.nodes.tolist())
for i, node in enumerate(self.nodes):
if type(node) == str:
self.nodes[i] = Node(node)
def _convert_edges(self):
"""Convert list of edge names or integers to `Edge` objects if needed."""
if isinstance(self.edges, np.ndarray):
self.edges = self.edges.tolist()
node_names = self.node_names
for i, edge in enumerate(self.edges):
if type(edge) == Edge:
continue
src, dst = edge
if type(src) == str:
try:
src = node_names.index(src)
except ValueError:
raise ValueError(
f"Node '{src}' specified in the edge list is not in the nodes."
)
if type(src) == int or (
np.isscalar(src) and np.issubdtype(src.dtype, np.integer)
):
src = self.nodes[src]
if type(dst) == str:
try:
dst = node_names.index(dst)
except ValueError:
raise ValueError(
f"Node '{dst}' specified in the edge list is not in the nodes."
)
if type(dst) == int or (
np.isscalar(dst) and np.issubdtype(dst.dtype, np.integer)
):
dst = self.nodes[dst]
self.edges[i] = Edge(src, dst)
@property
def node_names(self) -> list[str]:
"""Names of the nodes associated with this skeleton as a list of strings."""
return [node.name for node in self.nodes]
@property
def edge_inds(self) -> list[Tuple[int, int]]:
"""Edges indices as a list of 2-tuples."""
return [
(self.nodes.index(edge.source), self.nodes.index(edge.destination))
for edge in self.edges
]
@property
def edge_names(self) -> list[str, str]:
"""Edge names as a list of 2-tuples with string node names."""
return [(edge.source.name, edge.destination.name) for edge in self.edges]
@property
def flipped_node_inds(self) -> list[int]:
"""Returns node indices that should be switched when horizontally flipping."""
flip_idx = np.arange(len(self.nodes))
if len(self.symmetries) > 0:
symmetry_inds = np.array(
[(self.index(a), self.index(b)) for a, b in self.symmetries]
)
flip_idx[symmetry_inds[:, 0]] = symmetry_inds[:, 1]
flip_idx[symmetry_inds[:, 1]] = symmetry_inds[:, 0]
flip_idx = flip_idx.tolist()
return flip_idx
def __len__(self) -> int:
"""Return the number of nodes in the skeleton."""
return len(self.nodes)
def __repr__(self) -> str:
"""Return a readable representation of the skeleton."""
nodes = ", ".join([f'"{node}"' for node in self.node_names])
return "Skeleton(" f"nodes=[{nodes}], " f"edges={self.edge_inds}" ")"
def index(self, node: Node | str) -> int:
"""Return the index of a node specified as a `Node` or string name."""
if type(node) == str:
return self.index(self._node_name_map[node])
elif type(node) == Node:
return self._node_ind_map[node]
else:
raise IndexError(f"Invalid indexing argument for skeleton: {node}")
def __getitem__(self, idx: int | str) -> Node:
"""Return a `Node` when indexing by name or integer."""
if type(idx) == int:
return self.nodes[idx]
elif type(idx) == str:
return self._node_name_map[idx]
else:
raise IndexError(f"Invalid indexing argument for skeleton: {idx}")
def add_node(self, node: Node | str):
"""Add a `Node` to the skeleton.
Args:
node: A `Node` object or a string name to create a new node.
"""
if type(node) == str:
node = Node(node)
if node not in self.nodes:
self.nodes.append(node)
self._update_node_map(None, self.nodes)
def add_edge(self, src: Edge | Node | str = None, dst: Node | str = None):
"""Add an `Edge` to the skeleton.
Args:
src: The source `Node` or name of the source node.
dst: The destination `Node` or name of the destination node.
"""
if type(src) == Edge:
edge = src
if edge not in self.edges:
self.edges.append(edge)
if edge.source not in self.nodes:
self.add_node(edge.source)
if edge.destination not in self.nodes:
self.add_node(edge.destination)
return
if type(src) == str or type(src) == Node:
try:
src = self.index(src)
except KeyError:
self.add_node(src)
src = self.index(src)
if type(dst) == str or type(dst) == Node:
try:
dst = self.index(dst)
except KeyError:
self.add_node(dst)
dst = self.index(dst)
edge = Edge(self.nodes[src], self.nodes[dst])
if edge not in self.edges:
self.edges.append(edge)
def add_symmetry(
self, node1: Symmetry | Node | str = None, node2: Node | str = None
):
"""Add a symmetry relationship to the skeleton.
Args:
node1: The first `Node` or name of the first node.
node2: The second `Node` or name of the second node.
"""
if type(node1) == Symmetry:
if node1 not in self.symmetries:
self.symmetries.append(node1)
for node in node1.nodes:
if node not in self.nodes:
self.add_node(node)
return
if type(node1) == str or type(node1) == Node:
try:
node1 = self.index(node1)
except KeyError:
self.add_node(node1)
node1 = self.index(node1)
if type(node2) == str or type(node2) == Node:
try:
node2 = self.index(node2)
except KeyError:
self.add_node(node2)
node2 = self.index(node2)
symmetry = Symmetry({self.nodes[node1], self.nodes[node2]})
if symmetry not in self.symmetries:
self.symmetries.append(symmetry)
edge_inds: list[Tuple[int, int]]
property
¶
Edges indices as a list of 2-tuples.
edge_names: list[str, str]
property
¶
Edge names as a list of 2-tuples with string node names.
flipped_node_inds: list[int]
property
¶
Returns node indices that should be switched when horizontally flipping.
node_names: list[str]
property
¶
Names of the nodes associated with this skeleton as a list of strings.
__attrs_post_init__()
¶
Ensure nodes are Node
s, edges are Edge
s, and Node
map is updated.
__getitem__(idx)
¶
Return a Node
when indexing by name or integer.
Source code in sleap_io/model/skeleton.py
__len__()
¶
__repr__()
¶
Return a readable representation of the skeleton.
add_edge(src=None, dst=None)
¶
Add an Edge
to the skeleton.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Edge | Node | str
|
The source |
None
|
dst |
Node | str
|
The destination |
None
|
Source code in sleap_io/model/skeleton.py
def add_edge(self, src: Edge | Node | str = None, dst: Node | str = None):
"""Add an `Edge` to the skeleton.
Args:
src: The source `Node` or name of the source node.
dst: The destination `Node` or name of the destination node.
"""
if type(src) == Edge:
edge = src
if edge not in self.edges:
self.edges.append(edge)
if edge.source not in self.nodes:
self.add_node(edge.source)
if edge.destination not in self.nodes:
self.add_node(edge.destination)
return
if type(src) == str or type(src) == Node:
try:
src = self.index(src)
except KeyError:
self.add_node(src)
src = self.index(src)
if type(dst) == str or type(dst) == Node:
try:
dst = self.index(dst)
except KeyError:
self.add_node(dst)
dst = self.index(dst)
edge = Edge(self.nodes[src], self.nodes[dst])
if edge not in self.edges:
self.edges.append(edge)
add_node(node)
¶
Add a Node
to the skeleton.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node |
Node | str
|
A |
required |
Source code in sleap_io/model/skeleton.py
add_symmetry(node1=None, node2=None)
¶
Add a symmetry relationship to the skeleton.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node1 |
Symmetry | Node | str
|
The first |
None
|
node2 |
Node | str
|
The second |
None
|
Source code in sleap_io/model/skeleton.py
def add_symmetry(
self, node1: Symmetry | Node | str = None, node2: Node | str = None
):
"""Add a symmetry relationship to the skeleton.
Args:
node1: The first `Node` or name of the first node.
node2: The second `Node` or name of the second node.
"""
if type(node1) == Symmetry:
if node1 not in self.symmetries:
self.symmetries.append(node1)
for node in node1.nodes:
if node not in self.nodes:
self.add_node(node)
return
if type(node1) == str or type(node1) == Node:
try:
node1 = self.index(node1)
except KeyError:
self.add_node(node1)
node1 = self.index(node1)
if type(node2) == str or type(node2) == Node:
try:
node2 = self.index(node2)
except KeyError:
self.add_node(node2)
node2 = self.index(node2)
symmetry = Symmetry({self.nodes[node1], self.nodes[node2]})
if symmetry not in self.symmetries:
self.symmetries.append(symmetry)
index(node)
¶
Return the index of a node specified as a Node
or string name.
Source code in sleap_io/model/skeleton.py
def index(self, node: Node | str) -> int:
"""Return the index of a node specified as a `Node` or string name."""
if type(node) == str:
return self.index(self._node_name_map[node])
elif type(node) == Node:
return self._node_ind_map[node]
else:
raise IndexError(f"Invalid indexing argument for skeleton: {node}")
sleap_io.Node
¶
A landmark type within a Skeleton
.
This typically corresponds to a unique landmark within a skeleton, such as the "left eye".
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
Descriptive label for the landmark. |
Source code in sleap_io/model/skeleton.py
sleap_io.Edge
¶
A connection between two Node
objects within a Skeleton
.
This is a directed edge, representing the ordering of Node
s in the Skeleton
tree.
Attributes:
Name | Type | Description |
---|---|---|
source |
Node
|
The origin |
destination |
Node
|
The destination |
Source code in sleap_io/model/skeleton.py
@define(frozen=True)
class Edge:
"""A connection between two `Node` objects within a `Skeleton`.
This is a directed edge, representing the ordering of `Node`s in the `Skeleton`
tree.
Attributes:
source: The origin `Node`.
destination: The destination `Node`.
"""
source: Node
destination: Node
def __getitem__(self, idx) -> Node:
"""Return the source `Node` (`idx` is 0) or destination `Node` (`idx` is 1)."""
if idx == 0:
return self.source
elif idx == 1:
return self.destination
else:
raise IndexError("Edge only has 2 nodes (source and destination).")
__getitem__(idx)
¶
Return the source Node
(idx
is 0) or destination Node
(idx
is 1).
Source code in sleap_io/model/skeleton.py
sleap_io.Symmetry
¶
A relationship between a pair of nodes denoting their left/right pairing.
Attributes:
Name | Type | Description |
---|---|---|
nodes |
set[Node]
|
A set of two |
Source code in sleap_io/model/skeleton.py
@define
class Symmetry:
"""A relationship between a pair of nodes denoting their left/right pairing.
Attributes:
nodes: A set of two `Node`s.
"""
nodes: set[Node] = field(converter=set, validator=lambda _, __, val: len(val) == 2)
def __iter__(self):
"""Iterate over the symmetric nodes."""
return iter(self.nodes)
def __getitem__(self, idx) -> Node:
"""Return the first node."""
for i, node in enumerate(self.nodes):
if i == idx:
return node
sleap_io.Track
¶
An object that represents the same animal/object across multiple detections.
This allows tracking of unique entities in the video over time and space.
A Track
may also be used to refer to unique identity classes that span multiple
videos, such as "female mouse"
.
Attributes:
Name | Type | Description |
---|---|---|
name |
str
|
A name given to this track for identification purposes. |
Notes
Track
s are compared by identity. This means that unique track objects with the
same name are considered to be different.
Source code in sleap_io/model/instance.py
@define(eq=False)
class Track:
"""An object that represents the same animal/object across multiple detections.
This allows tracking of unique entities in the video over time and space.
A `Track` may also be used to refer to unique identity classes that span multiple
videos, such as `"female mouse"`.
Attributes:
name: A name given to this track for identification purposes.
Notes:
`Track`s are compared by identity. This means that unique track objects with the
same name are considered to be different.
"""
name: str = ""
sleap_io.Video
¶
Video
class used by sleap to represent videos and data associated with them.
This class is used to store information regarding a video and its components.
It is used to store the video's filename
, shape
, and the video's backend
.
To create a Video
object, use the from_filename
method which will select the
backend appropriately.
Attributes:
Name | Type | Description |
---|---|---|
filename |
str | list[str]
|
The filename(s) of the video. Supported extensions: "mp4", "avi", "mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif", "tiff", "bmp". If the filename is a list, a list of image filenames are expected. If filename is a folder, it will be searched for images. |
backend |
Optional[VideoBackend]
|
An object that implements the basic methods for reading and manipulating frames of a specific video type. |
backend_metadata |
dict[str, any]
|
A dictionary of metadata specific to the backend. This is useful for storing metadata that requires an open backend (e.g., shape information) without having access to the video file itself. |
source_video |
Optional[Video]
|
The source video object if this is a proxy video. This is present when the video contains an embedded subset of frames from another video. |
Notes
Instances of this class are hashed by identity, not by value. This means that
two Video
instances with the same attributes will NOT be considered equal in a
set or dict.
See also: VideoBackend
Source code in sleap_io/model/video.py
@attrs.define(eq=False)
class Video:
"""`Video` class used by sleap to represent videos and data associated with them.
This class is used to store information regarding a video and its components.
It is used to store the video's `filename`, `shape`, and the video's `backend`.
To create a `Video` object, use the `from_filename` method which will select the
backend appropriately.
Attributes:
filename: The filename(s) of the video. Supported extensions: "mp4", "avi",
"mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif",
"tiff", "bmp". If the filename is a list, a list of image filenames are
expected. If filename is a folder, it will be searched for images.
backend: An object that implements the basic methods for reading and
manipulating frames of a specific video type.
backend_metadata: A dictionary of metadata specific to the backend. This is
useful for storing metadata that requires an open backend (e.g., shape
information) without having access to the video file itself.
source_video: The source video object if this is a proxy video. This is present
when the video contains an embedded subset of frames from another video.
Notes:
Instances of this class are hashed by identity, not by value. This means that
two `Video` instances with the same attributes will NOT be considered equal in a
set or dict.
See also: VideoBackend
"""
filename: str | list[str]
backend: Optional[VideoBackend] = None
backend_metadata: dict[str, any] = attrs.field(factory=dict)
source_video: Optional[Video] = None
EXTS = MediaVideo.EXTS + HDF5Video.EXTS + ImageVideo.EXTS
def __attrs_post_init__(self):
"""Post init syntactic sugar."""
if self.backend is None and self.exists():
self.open()
@classmethod
def from_filename(
cls,
filename: str | list[str],
dataset: Optional[str] = None,
grayscale: Optional[bool] = None,
keep_open: bool = True,
source_video: Optional[Video] = None,
**kwargs,
) -> VideoBackend:
"""Create a Video from a filename.
Args:
filename: The filename(s) of the video. Supported extensions: "mp4", "avi",
"mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif",
"tiff", "bmp". If the filename is a list, a list of image filenames are
expected. If filename is a folder, it will be searched for images.
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
keep_open: Whether to keep the video reader open between calls to read
frames. If False, will close the reader after each call. If True (the
default), it will keep the reader open and cache it for subsequent calls
which may enhance the performance of reading multiple frames.
source_video: The source video object if this is a proxy video. This is
present when the video contains an embedded subset of frames from
another video.
Returns:
Video instance with the appropriate backend instantiated.
"""
return cls(
filename=filename,
backend=VideoBackend.from_filename(
filename,
dataset=dataset,
grayscale=grayscale,
keep_open=keep_open,
**kwargs,
),
source_video=source_video,
)
@property
def shape(self) -> Tuple[int, int, int, int] | None:
"""Return the shape of the video as (num_frames, height, width, channels).
If the video backend is not set or it cannot determine the shape of the video,
this will return None.
"""
return self._get_shape()
def _get_shape(self) -> Tuple[int, int, int, int] | None:
"""Return the shape of the video as (num_frames, height, width, channels).
This suppresses errors related to querying the backend for the video shape, such
as when it has not been set or when the video file is not found.
"""
try:
return self.backend.shape
except:
if "shape" in self.backend_metadata:
return self.backend_metadata["shape"]
return None
@property
def grayscale(self) -> bool | None:
"""Return whether the video is grayscale.
If the video backend is not set or it cannot determine whether the video is
grayscale, this will return None.
"""
shape = self.shape
if shape is not None:
return shape[-1] == 1
else:
return self.backend_metadata.get("grayscale", None)
@grayscale.setter
def grayscale(self, value: bool):
"""Set the grayscale value and adjust the backend."""
if self.backend is not None:
self.backend.grayscale = value
self.backend._cached_shape = None
self.backend_metadata["grayscale"] = value
def __len__(self) -> int:
"""Return the length of the video as the number of frames."""
shape = self.shape
return 0 if shape is None else shape[0]
def __repr__(self) -> str:
"""Informal string representation (for print or format)."""
dataset = (
f"dataset={self.backend.dataset}, "
if getattr(self.backend, "dataset", "")
else ""
)
return (
"Video("
f'filename="{self.filename}", '
f"shape={self.shape}, "
f"{dataset}"
f"backend={type(self.backend).__name__}"
")"
)
def __str__(self) -> str:
"""Informal string representation (for print or format)."""
return self.__repr__()
def __getitem__(self, inds: int | list[int] | slice) -> np.ndarray:
"""Return the frames of the video at the given indices.
Args:
inds: Index or list of indices of frames to read.
Returns:
Frame or frames as a numpy array of shape `(height, width, channels)` if a
scalar index is provided, or `(frames, height, width, channels)` if a list
of indices is provided.
See also: VideoBackend.get_frame, VideoBackend.get_frames
"""
if not self.is_open:
self.open()
return self.backend[inds]
def exists(self, check_all: bool = False) -> bool:
"""Check if the video file exists.
Args:
check_all: If `True`, check that all filenames in a list exist. If `False`
(the default), check that the first filename exists.
"""
if isinstance(self.filename, list):
if check_all:
for f in self.filename:
if not Path(f).exists():
return False
return True
else:
return Path(self.filename[0]).exists()
return Path(self.filename).exists()
@property
def is_open(self) -> bool:
"""Check if the video backend is open."""
return self.exists() and self.backend is not None
def open(
self,
dataset: Optional[str] = None,
grayscale: Optional[str] = None,
keep_open: bool = True,
):
"""Open the video backend for reading.
Args:
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
keep_open: Whether to keep the video reader open between calls to read
frames. If False, will close the reader after each call. If True (the
default), it will keep the reader open and cache it for subsequent calls
which may enhance the performance of reading multiple frames.
Notes:
This is useful for opening the video backend to read frames and then closing
it after reading all the necessary frames.
If the backend was already open, it will be closed before opening a new one.
Values for the HDF5 dataset and grayscale will be remembered if not
specified.
"""
if not self.exists():
raise FileNotFoundError(f"Video file not found: {self.filename}")
# Try to remember values from previous backend if available and not specified.
if self.backend is not None:
if dataset is None:
dataset = getattr(self.backend, "dataset", None)
if grayscale is None:
grayscale = getattr(self.backend, "grayscale", None)
else:
if dataset is None and "dataset" in self.backend_metadata:
dataset = self.backend_metadata["dataset"]
if grayscale is None and "grayscale" in self.backend_metadata:
grayscale = self.backend_metadata["grayscale"]
# Close previous backend if open.
self.close()
# Create new backend.
self.backend = VideoBackend.from_filename(
self.filename,
dataset=dataset,
grayscale=grayscale,
keep_open=keep_open,
)
def close(self):
"""Close the video backend."""
if self.backend is not None:
del self.backend
self.backend = None
def replace_filename(
self, new_filename: str | Path | list[str] | list[Path], open: bool = True
):
"""Update the filename of the video, optionally opening the backend.
Args:
new_filename: New filename to set for the video.
open: If `True` (the default), open the backend with the new filename. If
the new filename does not exist, no error is raised.
"""
if isinstance(new_filename, Path):
new_filename = new_filename.as_posix()
if isinstance(new_filename, list):
new_filename = [
p.as_posix() if isinstance(p, Path) else p for p in new_filename
]
self.filename = new_filename
if open:
if self.exists():
self.open()
else:
self.close()
grayscale: bool | None
property
writable
¶
Return whether the video is grayscale.
If the video backend is not set or it cannot determine whether the video is grayscale, this will return None.
is_open: bool
property
¶
Check if the video backend is open.
shape: Tuple[int, int, int, int] | None
property
¶
Return the shape of the video as (num_frames, height, width, channels).
If the video backend is not set or it cannot determine the shape of the video, this will return None.
__attrs_post_init__()
¶
__getitem__(inds)
¶
Return the frames of the video at the given indices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inds |
int | list[int] | slice
|
Index or list of indices of frames to read. |
required |
Returns:
Type | Description |
---|---|
ndarray
|
Frame or frames as a numpy array of shape |
See also: VideoBackend.get_frame, VideoBackend.get_frames
Source code in sleap_io/model/video.py
def __getitem__(self, inds: int | list[int] | slice) -> np.ndarray:
"""Return the frames of the video at the given indices.
Args:
inds: Index or list of indices of frames to read.
Returns:
Frame or frames as a numpy array of shape `(height, width, channels)` if a
scalar index is provided, or `(frames, height, width, channels)` if a list
of indices is provided.
See also: VideoBackend.get_frame, VideoBackend.get_frames
"""
if not self.is_open:
self.open()
return self.backend[inds]
__len__()
¶
__repr__()
¶
Informal string representation (for print or format).
Source code in sleap_io/model/video.py
def __repr__(self) -> str:
"""Informal string representation (for print or format)."""
dataset = (
f"dataset={self.backend.dataset}, "
if getattr(self.backend, "dataset", "")
else ""
)
return (
"Video("
f'filename="{self.filename}", '
f"shape={self.shape}, "
f"{dataset}"
f"backend={type(self.backend).__name__}"
")"
)
__str__()
¶
close()
¶
exists(check_all=False)
¶
Check if the video file exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
check_all |
bool
|
If |
False
|
Source code in sleap_io/model/video.py
def exists(self, check_all: bool = False) -> bool:
"""Check if the video file exists.
Args:
check_all: If `True`, check that all filenames in a list exist. If `False`
(the default), check that the first filename exists.
"""
if isinstance(self.filename, list):
if check_all:
for f in self.filename:
if not Path(f).exists():
return False
return True
else:
return Path(self.filename[0]).exists()
return Path(self.filename).exists()
from_filename(filename, dataset=None, grayscale=None, keep_open=True, source_video=None, **kwargs)
classmethod
¶
Create a Video from a filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str | list[str]
|
The filename(s) of the video. Supported extensions: "mp4", "avi", "mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif", "tiff", "bmp". If the filename is a list, a list of image filenames are expected. If filename is a folder, it will be searched for images. |
required |
dataset |
Optional[str]
|
Name of dataset in HDF5 file. |
None
|
grayscale |
Optional[bool]
|
Whether to force grayscale. If None, autodetect on first frame load. |
None
|
keep_open |
bool
|
Whether to keep the video reader open between calls to read frames. If False, will close the reader after each call. If True (the default), it will keep the reader open and cache it for subsequent calls which may enhance the performance of reading multiple frames. |
True
|
source_video |
Optional[Video]
|
The source video object if this is a proxy video. This is present when the video contains an embedded subset of frames from another video. |
None
|
Returns:
Type | Description |
---|---|
VideoBackend
|
Video instance with the appropriate backend instantiated. |
Source code in sleap_io/model/video.py
@classmethod
def from_filename(
cls,
filename: str | list[str],
dataset: Optional[str] = None,
grayscale: Optional[bool] = None,
keep_open: bool = True,
source_video: Optional[Video] = None,
**kwargs,
) -> VideoBackend:
"""Create a Video from a filename.
Args:
filename: The filename(s) of the video. Supported extensions: "mp4", "avi",
"mov", "mj2", "mkv", "h5", "hdf5", "slp", "png", "jpg", "jpeg", "tif",
"tiff", "bmp". If the filename is a list, a list of image filenames are
expected. If filename is a folder, it will be searched for images.
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
keep_open: Whether to keep the video reader open between calls to read
frames. If False, will close the reader after each call. If True (the
default), it will keep the reader open and cache it for subsequent calls
which may enhance the performance of reading multiple frames.
source_video: The source video object if this is a proxy video. This is
present when the video contains an embedded subset of frames from
another video.
Returns:
Video instance with the appropriate backend instantiated.
"""
return cls(
filename=filename,
backend=VideoBackend.from_filename(
filename,
dataset=dataset,
grayscale=grayscale,
keep_open=keep_open,
**kwargs,
),
source_video=source_video,
)
open(dataset=None, grayscale=None, keep_open=True)
¶
Open the video backend for reading.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
Optional[str]
|
Name of dataset in HDF5 file. |
None
|
grayscale |
Optional[str]
|
Whether to force grayscale. If None, autodetect on first frame load. |
None
|
keep_open |
bool
|
Whether to keep the video reader open between calls to read frames. If False, will close the reader after each call. If True (the default), it will keep the reader open and cache it for subsequent calls which may enhance the performance of reading multiple frames. |
True
|
Notes
This is useful for opening the video backend to read frames and then closing it after reading all the necessary frames.
If the backend was already open, it will be closed before opening a new one. Values for the HDF5 dataset and grayscale will be remembered if not specified.
Source code in sleap_io/model/video.py
def open(
self,
dataset: Optional[str] = None,
grayscale: Optional[str] = None,
keep_open: bool = True,
):
"""Open the video backend for reading.
Args:
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
keep_open: Whether to keep the video reader open between calls to read
frames. If False, will close the reader after each call. If True (the
default), it will keep the reader open and cache it for subsequent calls
which may enhance the performance of reading multiple frames.
Notes:
This is useful for opening the video backend to read frames and then closing
it after reading all the necessary frames.
If the backend was already open, it will be closed before opening a new one.
Values for the HDF5 dataset and grayscale will be remembered if not
specified.
"""
if not self.exists():
raise FileNotFoundError(f"Video file not found: {self.filename}")
# Try to remember values from previous backend if available and not specified.
if self.backend is not None:
if dataset is None:
dataset = getattr(self.backend, "dataset", None)
if grayscale is None:
grayscale = getattr(self.backend, "grayscale", None)
else:
if dataset is None and "dataset" in self.backend_metadata:
dataset = self.backend_metadata["dataset"]
if grayscale is None and "grayscale" in self.backend_metadata:
grayscale = self.backend_metadata["grayscale"]
# Close previous backend if open.
self.close()
# Create new backend.
self.backend = VideoBackend.from_filename(
self.filename,
dataset=dataset,
grayscale=grayscale,
keep_open=keep_open,
)
replace_filename(new_filename, open=True)
¶
Update the filename of the video, optionally opening the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_filename |
str | Path | list[str] | list[Path]
|
New filename to set for the video. |
required |
open |
bool
|
If |
True
|
Source code in sleap_io/model/video.py
def replace_filename(
self, new_filename: str | Path | list[str] | list[Path], open: bool = True
):
"""Update the filename of the video, optionally opening the backend.
Args:
new_filename: New filename to set for the video.
open: If `True` (the default), open the backend with the new filename. If
the new filename does not exist, no error is raised.
"""
if isinstance(new_filename, Path):
new_filename = new_filename.as_posix()
if isinstance(new_filename, list):
new_filename = [
p.as_posix() if isinstance(p, Path) else p for p in new_filename
]
self.filename = new_filename
if open:
if self.exists():
self.open()
else:
self.close()