video_reading
sleap_io.io.video_reading
¶
Backends for reading videos.
Classes:
Name | Description |
---|---|
HDF5Video |
Video backend for reading videos stored in HDF5 files. |
ImageVideo |
Video backend for reading videos stored as image files. |
MediaVideo |
Video backend for reading videos stored as common media files. |
VideoBackend |
Base class for video backends. |
HDF5Video
¶
Bases: VideoBackend
Video backend for reading videos stored in HDF5 files.
This backend supports reading videos stored in HDF5 files, both in rank-4 datasets as well as in datasets with lists of binary-encoded images.
Embedded image datasets are used in SLEAP when exporting package files (.pkg.slp
)
with videos embedded in them. This is useful for bundling training or inference data
without having to worry about the videos (or frame images) being moved or deleted.
It is expected that these types of datasets will be in a Group
with a int8
variable length dataset called "video"
. This dataset must also contain an
attribute called "format" with a string describing the image format (e.g., "png" or
"jpg") which will be used to decode it appropriately.
If a frame_numbers
dataset is present in the group, it will be used to map from
source video frames to the frames in the dataset. This is useful to preserve frame
indexing when exporting a subset of frames in the video. It will also be used to
populate frame_map
and source_inds
attributes.
Attributes:
Name | Type | Description |
---|---|---|
filename |
Path to HDF5 file (.h5, .hdf5 or .slp). |
|
grayscale |
Whether to force grayscale. If None, autodetect on first frame load. |
|
keep_open |
Whether to keep the video reader open between calls to read frames. If False, will close the reader after each call. If True (the default), it will keep the reader open and cache it for subsequent calls which may enhance the performance of reading multiple frames. |
|
dataset |
Optional[str]
|
Name of dataset to read from. If |
input_format |
str
|
Format of the data in the dataset. One of "channels_last" (the
default) in |
frame_map |
dict[int, int]
|
Mapping from frame indices to indices in the dataset. This is used to translate between the frame indices of the images within their source video and the indices of the images in the dataset. This is only used when reading embedded image datasets. |
source_filename |
Optional[str]
|
Path to the source video file. This is metadata and only used when reading embedded image datasets. |
source_inds |
Optional[ndarray]
|
Indices of the frames in the source video file. This is metadata and only used when reading embedded image datasets. |
image_format |
str
|
Format of the images in the embedded dataset. This is metadata and only used when reading embedded image datasets. |
Methods:
Name | Description |
---|---|
__attrs_post_init__ |
Auto-detect dataset and frame map heuristically. |
decode_embedded |
Decode an embedded image string into a numpy array. |
has_frame |
Check if a frame index is contained in the video. |
read_test_frame |
Read a single frame from the video to test for grayscale. |
Attributes:
Name | Type | Description |
---|---|---|
embedded_frame_inds |
list[int]
|
Return the frame indices of the embedded images. |
has_embedded_images |
bool
|
Return True if the dataset contains embedded images. |
img_shape |
Tuple[int, int, int]
|
Shape of a single frame in the video as |
num_frames |
int
|
Number of frames in the video. |
Source code in sleap_io/io/video_reading.py
@attrs.define
class HDF5Video(VideoBackend):
"""Video backend for reading videos stored in HDF5 files.
This backend supports reading videos stored in HDF5 files, both in rank-4 datasets
as well as in datasets with lists of binary-encoded images.
Embedded image datasets are used in SLEAP when exporting package files (`.pkg.slp`)
with videos embedded in them. This is useful for bundling training or inference data
without having to worry about the videos (or frame images) being moved or deleted.
It is expected that these types of datasets will be in a `Group` with a `int8`
variable length dataset called `"video"`. This dataset must also contain an
attribute called "format" with a string describing the image format (e.g., "png" or
"jpg") which will be used to decode it appropriately.
If a `frame_numbers` dataset is present in the group, it will be used to map from
source video frames to the frames in the dataset. This is useful to preserve frame
indexing when exporting a subset of frames in the video. It will also be used to
populate `frame_map` and `source_inds` attributes.
Attributes:
filename: Path to HDF5 file (.h5, .hdf5 or .slp).
grayscale: Whether to force grayscale. If None, autodetect on first frame load.
keep_open: Whether to keep the video reader open between calls to read frames.
If False, will close the reader after each call. If True (the default), it
will keep the reader open and cache it for subsequent calls which may
enhance the performance of reading multiple frames.
dataset: Name of dataset to read from. If `None`, will try to find a rank-4
dataset by iterating through datasets in the file. If specifying an embedded
dataset, this can be the group containing a "video" dataset or the dataset
itself (e.g., "video0" or "video0/video").
input_format: Format of the data in the dataset. One of "channels_last" (the
default) in `(frames, height, width, channels)` order or "channels_first" in
`(frames, channels, width, height)` order. Embedded datasets should use the
"channels_last" format.
frame_map: Mapping from frame indices to indices in the dataset. This is used to
translate between the frame indices of the images within their source video
and the indices of the images in the dataset. This is only used when reading
embedded image datasets.
source_filename: Path to the source video file. This is metadata and only used
when reading embedded image datasets.
source_inds: Indices of the frames in the source video file. This is metadata
and only used when reading embedded image datasets.
image_format: Format of the images in the embedded dataset. This is metadata and
only used when reading embedded image datasets.
"""
dataset: Optional[str] = None
input_format: str = attrs.field(
default="channels_last",
validator=attrs.validators.in_(["channels_last", "channels_first"]),
)
frame_map: dict[int, int] = attrs.field(init=False, default=attrs.Factory(dict))
source_filename: Optional[str] = None
source_inds: Optional[np.ndarray] = None
image_format: str = "hdf5"
EXTS = ("h5", "hdf5", "slp")
def __attrs_post_init__(self):
"""Auto-detect dataset and frame map heuristically."""
# Check if the file accessible before applying heuristics.
try:
f = h5py.File(self.filename, "r")
except OSError:
return
if self.dataset is None:
# Iterate through datasets to find a rank 4 array.
def find_movies(name, obj):
if isinstance(obj, h5py.Dataset) and obj.ndim == 4:
self.dataset = name
return True
f.visititems(find_movies)
if self.dataset is None:
# Iterate through datasets to find an embedded video dataset.
def find_embedded(name, obj):
if isinstance(obj, h5py.Dataset) and name.endswith("/video"):
self.dataset = name
return True
f.visititems(find_embedded)
if self.dataset is None:
# Couldn't find video datasets.
return
if isinstance(f[self.dataset], h5py.Group):
# If this is a group, assume it's an embedded video dataset.
if "video" in f[self.dataset]:
self.dataset = f"{self.dataset}/video"
if self.dataset.split("/")[-1] == "video":
# This may be an embedded video dataset. Check for frame map.
ds = f[self.dataset]
if "format" in ds.attrs:
self.image_format = ds.attrs["format"]
if "frame_numbers" in ds.parent:
frame_numbers = ds.parent["frame_numbers"][:].astype(int)
self.frame_map = {frame: idx for idx, frame in enumerate(frame_numbers)}
self.source_inds = frame_numbers
if "source_video" in ds.parent:
self.source_filename = json.loads(
ds.parent["source_video"].attrs["json"]
)["backend"]["filename"]
f.close()
@property
def num_frames(self) -> int:
"""Number of frames in the video."""
with h5py.File(self.filename, "r") as f:
return f[self.dataset].shape[0]
@property
def img_shape(self) -> Tuple[int, int, int]:
"""Shape of a single frame in the video as `(height, width, channels)`."""
with h5py.File(self.filename, "r") as f:
ds = f[self.dataset]
img_shape = None
if "height" in ds.attrs:
# Try to get shape from the attributes.
img_shape = (
ds.attrs["height"],
ds.attrs["width"],
ds.attrs["channels"],
)
if img_shape[0] == 0 or img_shape[1] == 0:
# Invalidate the shape if the attributes are zero.
img_shape = None
if img_shape is None and self.image_format == "hdf5" and ds.ndim == 4:
# Use the dataset shape if just stored as a rank-4 array.
img_shape = ds.shape[1:]
if self.input_format == "channels_first":
img_shape = img_shape[::-1]
if img_shape is None:
# Fall back to reading a test frame.
return super().img_shape
return int(img_shape[0]), int(img_shape[1]), int(img_shape[2])
def read_test_frame(self) -> np.ndarray:
"""Read a single frame from the video to test for grayscale."""
if self.frame_map:
frame_idx = list(self.frame_map.keys())[0]
else:
frame_idx = 0
return self._read_frame(frame_idx)
@property
def has_embedded_images(self) -> bool:
"""Return True if the dataset contains embedded images."""
return self.image_format is not None and self.image_format != "hdf5"
@property
def embedded_frame_inds(self) -> list[int]:
"""Return the frame indices of the embedded images."""
return list(self.frame_map.keys())
def decode_embedded(self, img_string: np.ndarray) -> np.ndarray:
"""Decode an embedded image string into a numpy array.
Args:
img_string: Binary string of the image as a `int8` numpy vector with the
bytes as values corresponding to the format-encoded image.
Returns:
The decoded image as a numpy array of shape `(height, width, channels)`. If
a rank-2 image is decoded, it will be expanded such that channels will be 1.
This method does not apply grayscale conversion as per the `grayscale`
attribute. Use the `get_frame` or `get_frames` methods of the `VideoBackend`
to apply grayscale conversion rather than calling this function directly.
"""
if "cv2" in sys.modules:
img = cv2.imdecode(img_string, cv2.IMREAD_UNCHANGED)
else:
img = iio.imread(BytesIO(img_string), extension=f".{self.image_format}")
if img.ndim == 2:
img = np.expand_dims(img, axis=-1)
return img
def has_frame(self, frame_idx: int) -> bool:
"""Check if a frame index is contained in the video.
Args:
frame_idx: Index of frame to check.
Returns:
`True` if the index is contained in the video, otherwise `False`.
"""
if self.frame_map:
return frame_idx in self.frame_map
else:
return frame_idx < len(self)
def _read_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video.
Args:
frame_idx: Index of frame to read.
Returns:
The frame as a numpy array of shape `(height, width, channels)`.
Notes:
This does not apply grayscale conversion. It is recommended to use the
`get_frame` method of the `VideoBackend` class instead.
"""
if self.keep_open:
if self._open_reader is None:
self._open_reader = h5py.File(self.filename, "r")
f = self._open_reader
else:
f = h5py.File(self.filename, "r")
ds = f[self.dataset]
if self.frame_map:
frame_idx = self.frame_map[frame_idx]
img = ds[frame_idx]
if self.has_embedded_images:
img = self.decode_embedded(img)
if self.input_format == "channels_first":
img = np.transpose(img, (2, 1, 0))
if not self.keep_open:
f.close()
return img
def _read_frames(self, frame_inds: list) -> np.ndarray:
"""Read a list of frames from the video.
Args:
frame_inds: List of indices of frames to read.
Returns:
The frame as a numpy array of shape `(frames, height, width, channels)`.
Notes:
This does not apply grayscale conversion. It is recommended to use the
`get_frames` method of the `VideoBackend` class instead.
"""
if self.keep_open:
if self._open_reader is None:
self._open_reader = h5py.File(self.filename, "r")
f = self._open_reader
else:
f = h5py.File(self.filename, "r")
if self.frame_map:
frame_inds = [self.frame_map[idx] for idx in frame_inds]
ds = f[self.dataset]
imgs = ds[frame_inds]
if "format" in ds.attrs:
imgs = np.stack(
[self.decode_embedded(img) for img in imgs],
axis=0,
)
if self.input_format == "channels_first":
imgs = np.transpose(imgs, (0, 3, 2, 1))
if not self.keep_open:
f.close()
return imgs
embedded_frame_inds: list[int]
property
¶
Return the frame indices of the embedded images.
has_embedded_images: bool
property
¶
Return True if the dataset contains embedded images.
img_shape: Tuple[int, int, int]
property
¶
Shape of a single frame in the video as (height, width, channels)
.
num_frames: int
property
¶
Number of frames in the video.
__attrs_post_init__()
¶
Auto-detect dataset and frame map heuristically.
Source code in sleap_io/io/video_reading.py
def __attrs_post_init__(self):
"""Auto-detect dataset and frame map heuristically."""
# Check if the file accessible before applying heuristics.
try:
f = h5py.File(self.filename, "r")
except OSError:
return
if self.dataset is None:
# Iterate through datasets to find a rank 4 array.
def find_movies(name, obj):
if isinstance(obj, h5py.Dataset) and obj.ndim == 4:
self.dataset = name
return True
f.visititems(find_movies)
if self.dataset is None:
# Iterate through datasets to find an embedded video dataset.
def find_embedded(name, obj):
if isinstance(obj, h5py.Dataset) and name.endswith("/video"):
self.dataset = name
return True
f.visititems(find_embedded)
if self.dataset is None:
# Couldn't find video datasets.
return
if isinstance(f[self.dataset], h5py.Group):
# If this is a group, assume it's an embedded video dataset.
if "video" in f[self.dataset]:
self.dataset = f"{self.dataset}/video"
if self.dataset.split("/")[-1] == "video":
# This may be an embedded video dataset. Check for frame map.
ds = f[self.dataset]
if "format" in ds.attrs:
self.image_format = ds.attrs["format"]
if "frame_numbers" in ds.parent:
frame_numbers = ds.parent["frame_numbers"][:].astype(int)
self.frame_map = {frame: idx for idx, frame in enumerate(frame_numbers)}
self.source_inds = frame_numbers
if "source_video" in ds.parent:
self.source_filename = json.loads(
ds.parent["source_video"].attrs["json"]
)["backend"]["filename"]
f.close()
decode_embedded(img_string)
¶
Decode an embedded image string into a numpy array.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
img_string
|
ndarray
|
Binary string of the image as a |
required |
Returns:
Type | Description |
---|---|
ndarray
|
The decoded image as a numpy array of shape This method does not apply grayscale conversion as per the |
Source code in sleap_io/io/video_reading.py
def decode_embedded(self, img_string: np.ndarray) -> np.ndarray:
"""Decode an embedded image string into a numpy array.
Args:
img_string: Binary string of the image as a `int8` numpy vector with the
bytes as values corresponding to the format-encoded image.
Returns:
The decoded image as a numpy array of shape `(height, width, channels)`. If
a rank-2 image is decoded, it will be expanded such that channels will be 1.
This method does not apply grayscale conversion as per the `grayscale`
attribute. Use the `get_frame` or `get_frames` methods of the `VideoBackend`
to apply grayscale conversion rather than calling this function directly.
"""
if "cv2" in sys.modules:
img = cv2.imdecode(img_string, cv2.IMREAD_UNCHANGED)
else:
img = iio.imread(BytesIO(img_string), extension=f".{self.image_format}")
if img.ndim == 2:
img = np.expand_dims(img, axis=-1)
return img
has_frame(frame_idx)
¶
Check if a frame index is contained in the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame_idx
|
int
|
Index of frame to check. |
required |
Returns:
Type | Description |
---|---|
bool
|
|
Source code in sleap_io/io/video_reading.py
def has_frame(self, frame_idx: int) -> bool:
"""Check if a frame index is contained in the video.
Args:
frame_idx: Index of frame to check.
Returns:
`True` if the index is contained in the video, otherwise `False`.
"""
if self.frame_map:
return frame_idx in self.frame_map
else:
return frame_idx < len(self)
read_test_frame()
¶
Read a single frame from the video to test for grayscale.
ImageVideo
¶
Bases: VideoBackend
Video backend for reading videos stored as image files.
This backend supports reading videos stored as a list of images.
Attributes:
Name | Type | Description |
---|---|---|
filename |
Path to image files. |
|
grayscale |
Whether to force grayscale. If None, autodetect on first frame load. |
Methods:
Name | Description |
---|---|
find_images |
Find images in a folder and return a list of filenames. |
Attributes:
Name | Type | Description |
---|---|---|
num_frames |
int
|
Number of frames in the video. |
Source code in sleap_io/io/video_reading.py
@attrs.define
class ImageVideo(VideoBackend):
"""Video backend for reading videos stored as image files.
This backend supports reading videos stored as a list of images.
Attributes:
filename: Path to image files.
grayscale: Whether to force grayscale. If None, autodetect on first frame load.
"""
EXTS = ("png", "jpg", "jpeg", "tif", "tiff", "bmp")
@staticmethod
def find_images(folder: str) -> list[str]:
"""Find images in a folder and return a list of filenames."""
folder = Path(folder)
return sorted(
[f.as_posix() for f in folder.glob("*") if f.suffix[1:] in ImageVideo.EXTS]
)
@property
def num_frames(self) -> int:
"""Number of frames in the video."""
return len(self.filename)
def _read_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video.
Args:
frame_idx: Index of frame to read.
Returns:
The frame as a numpy array of shape `(height, width, channels)`.
Notes:
This does not apply grayscale conversion. It is recommended to use the
`get_frame` method of the `VideoBackend` class instead.
"""
img = iio.imread(self.filename[frame_idx])
if img.ndim == 2:
img = np.expand_dims(img, axis=-1)
return img
MediaVideo
¶
Bases: VideoBackend
Video backend for reading videos stored as common media files.
This backend supports reading through FFMPEG (the default), pyav, or OpenCV. Here are their trade-offs:
- "opencv": Fastest video reader, but only supports a limited number of codecs
and may not be able to read some videos. It requires `opencv-python` to be
installed. It is the fastest because it uses the OpenCV C++ library to read
videos, but is limited by the version of FFMPEG that was linked into it at
build time as well as the OpenCV version used.
- "FFMPEG": Slowest, but most reliable. This is the default backend. It requires
`imageio-ffmpeg` and a `ffmpeg` executable on the system path (which can be
installed via conda). The `imageio` plugin for FFMPEG reads frames into raw
bytes which are communicated to Python through STDOUT on a subprocess pipe,
which can be slow. However, it is the most reliable and feature-complete. If
you install the conda-forge version of ffmpeg, it will be compiled with
support for many codecs, including GPU-accelerated codecs like NVDEC for
H264 and others.
- "pyav": Supports most codecs that FFMPEG does, but not as complete or reliable
of an implementation in `imageio` as FFMPEG for some video types. It is
faster than FFMPEG because it uses the `av` package to read frames directly
into numpy arrays in memory without the need for a subprocess pipe. These
are Python bindings for the C library libav, which is the same library that
FFMPEG uses under the hood.
Attributes:
Name | Type | Description |
---|---|---|
filename |
Path to video file. |
|
grayscale |
Whether to force grayscale. If None, autodetect on first frame load. |
|
keep_open |
Whether to keep the video reader open between calls to read frames. If False, will close the reader after each call. If True (the default), it will keep the reader open and cache it for subsequent calls which may enhance the performance of reading multiple frames. |
|
plugin |
str
|
Video plugin to use. One of "opencv", "FFMPEG", or "pyav". If |
Attributes:
Name | Type | Description |
---|---|---|
num_frames |
int
|
Number of frames in the video. |
reader |
object
|
Return the reader object for the video, caching if necessary. |
Source code in sleap_io/io/video_reading.py
@attrs.define
class MediaVideo(VideoBackend):
"""Video backend for reading videos stored as common media files.
This backend supports reading through FFMPEG (the default), pyav, or OpenCV. Here
are their trade-offs:
- "opencv": Fastest video reader, but only supports a limited number of codecs
and may not be able to read some videos. It requires `opencv-python` to be
installed. It is the fastest because it uses the OpenCV C++ library to read
videos, but is limited by the version of FFMPEG that was linked into it at
build time as well as the OpenCV version used.
- "FFMPEG": Slowest, but most reliable. This is the default backend. It requires
`imageio-ffmpeg` and a `ffmpeg` executable on the system path (which can be
installed via conda). The `imageio` plugin for FFMPEG reads frames into raw
bytes which are communicated to Python through STDOUT on a subprocess pipe,
which can be slow. However, it is the most reliable and feature-complete. If
you install the conda-forge version of ffmpeg, it will be compiled with
support for many codecs, including GPU-accelerated codecs like NVDEC for
H264 and others.
- "pyav": Supports most codecs that FFMPEG does, but not as complete or reliable
of an implementation in `imageio` as FFMPEG for some video types. It is
faster than FFMPEG because it uses the `av` package to read frames directly
into numpy arrays in memory without the need for a subprocess pipe. These
are Python bindings for the C library libav, which is the same library that
FFMPEG uses under the hood.
Attributes:
filename: Path to video file.
grayscale: Whether to force grayscale. If None, autodetect on first frame load.
keep_open: Whether to keep the video reader open between calls to read frames.
If False, will close the reader after each call. If True (the default), it
will keep the reader open and cache it for subsequent calls which may
enhance the performance of reading multiple frames.
plugin: Video plugin to use. One of "opencv", "FFMPEG", or "pyav". If `None`,
will use the first available plugin in the order listed above.
"""
plugin: str = attrs.field(
validator=attrs.validators.in_(["opencv", "FFMPEG", "pyav"])
)
EXTS = ("mp4", "avi", "mov", "mj2", "mkv")
@plugin.default
def _default_plugin(self) -> str:
if "cv2" in sys.modules:
return "opencv"
elif "imageio_ffmpeg" in sys.modules:
return "FFMPEG"
elif "av" in sys.modules:
return "pyav"
else:
raise ImportError(
"No video plugins found. Install opencv-python, imageio-ffmpeg, or av."
)
@property
def reader(self) -> object:
"""Return the reader object for the video, caching if necessary."""
if self.keep_open:
if self._open_reader is None:
if self.plugin == "opencv":
self._open_reader = cv2.VideoCapture(self.filename)
elif self.plugin == "pyav" or self.plugin == "FFMPEG":
self._open_reader = iio.imopen(
self.filename, "r", plugin=self.plugin
)
return self._open_reader
else:
if self.plugin == "opencv":
return cv2.VideoCapture(self.filename)
elif self.plugin == "pyav" or self.plugin == "FFMPEG":
return iio.imopen(self.filename, "r", plugin=self.plugin)
@property
def num_frames(self) -> int:
"""Number of frames in the video."""
if self.plugin == "opencv":
return int(self.reader.get(cv2.CAP_PROP_FRAME_COUNT))
else:
props = iio.improps(self.filename, plugin=self.plugin)
n_frames = props.n_images
if np.isinf(n_frames):
legacy_reader = self.reader.legacy_get_reader()
# Note: This might be super slow for some videos, so maybe we should
# defer evaluation of this or give the user control over it.
n_frames = legacy_reader.count_frames()
return n_frames
def _read_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video.
Args:
frame_idx: Index of frame to read.
Returns:
The frame as a numpy array of shape `(height, width, channels)`.
Notes:
This does not apply grayscale conversion. It is recommended to use the
`get_frame` method of the `VideoBackend` class instead.
"""
failed = False
if self.plugin == "opencv":
if self.reader.get(cv2.CAP_PROP_POS_FRAMES) != frame_idx:
self.reader.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
success, img = self.reader.read()
elif self.plugin == "pyav" or self.plugin == "FFMPEG":
if self.keep_open:
img = self.reader.read(index=frame_idx)
else:
with iio.imopen(self.filename, "r", plugin=self.plugin) as reader:
img = reader.read(index=frame_idx)
success = (not failed) and (img is not None)
if not success:
raise IndexError(f"Failed to read frame index {frame_idx}.")
return img
def _read_frames(self, frame_inds: list) -> np.ndarray:
"""Read a list of frames from the video.
Args:
frame_inds: List of indices of frames to read.
Returns:
The frame as a numpy array of shape `(frames, height, width, channels)`.
Notes:
This does not apply grayscale conversion. It is recommended to use the
`get_frames` method of the `VideoBackend` class instead.
"""
if self.plugin == "opencv":
if self.keep_open:
if self._open_reader is None:
self._open_reader = cv2.VideoCapture(self.filename)
reader = self._open_reader
else:
reader = cv2.VideoCapture(self.filename)
reader.set(cv2.CAP_PROP_POS_FRAMES, frame_inds[0])
imgs = []
for idx in frame_inds:
if reader.get(cv2.CAP_PROP_POS_FRAMES) != idx:
reader.set(cv2.CAP_PROP_POS_FRAMES, idx)
_, img = reader.read()
img = img[..., ::-1] # BGR -> RGB
imgs.append(img)
imgs = np.stack(imgs, axis=0)
elif self.plugin == "pyav" or self.plugin == "FFMPEG":
if self.keep_open:
if self._open_reader is None:
self._open_reader = iio.imopen(
self.filename, "r", plugin=self.plugin
)
reader = self._open_reader
imgs = np.stack([reader.read(index=idx) for idx in frame_inds], axis=0)
else:
with iio.imopen(self.filename, "r", plugin=self.plugin) as reader:
imgs = np.stack(
[reader.read(index=idx) for idx in frame_inds], axis=0
)
return imgs
VideoBackend
¶
Base class for video backends.
This class is not meant to be used directly. Instead, use the from_filename
constructor to create a backend instance.
Attributes:
Name | Type | Description |
---|---|---|
filename |
str | Path | list[str] | list[Path]
|
Path to video file(s). |
grayscale |
Optional[bool]
|
Whether to force grayscale. If None, autodetect on first frame load. |
keep_open |
bool
|
Whether to keep the video reader open between calls to read frames. If False, will close the reader after each call. If True (the default), it will keep the reader open and cache it for subsequent calls which may enhance the performance of reading multiple frames. |
Methods:
Name | Description |
---|---|
__getitem__ |
Return a single frame or a list of frames from the video. |
__len__ |
Return number of frames in the video. |
detect_grayscale |
Detect whether the video is grayscale. |
from_filename |
Create a VideoBackend from a filename. |
get_frame |
Read a single frame from the video. |
get_frames |
Read a list of frames from the video. |
has_frame |
Check if a frame index is contained in the video. |
read_test_frame |
Read a single frame from the video to test for grayscale. |
Attributes:
Name | Type | Description |
---|---|---|
frames |
int
|
Number of frames in the video. |
img_shape |
Tuple[int, int, int]
|
Shape of a single frame in the video. |
num_frames |
int
|
Number of frames in the video. Must be implemented in subclasses. |
shape |
Tuple[int, int, int, int]
|
Shape of the video as a tuple of |
Source code in sleap_io/io/video_reading.py
@attrs.define
class VideoBackend:
"""Base class for video backends.
This class is not meant to be used directly. Instead, use the `from_filename`
constructor to create a backend instance.
Attributes:
filename: Path to video file(s).
grayscale: Whether to force grayscale. If None, autodetect on first frame load.
keep_open: Whether to keep the video reader open between calls to read frames.
If False, will close the reader after each call. If True (the default), it
will keep the reader open and cache it for subsequent calls which may
enhance the performance of reading multiple frames.
"""
filename: str | Path | list[str] | list[Path]
grayscale: Optional[bool] = None
keep_open: bool = True
_cached_shape: Optional[Tuple[int, int, int, int]] = None
_open_reader: Optional[object] = None
@classmethod
def from_filename(
cls,
filename: str | list[str],
dataset: Optional[str] = None,
grayscale: Optional[bool] = None,
keep_open: bool = True,
**kwargs,
) -> VideoBackend:
"""Create a VideoBackend from a filename.
Args:
filename: Path to video file(s).
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
keep_open: Whether to keep the video reader open between calls to read
frames. If False, will close the reader after each call. If True (the
default), it will keep the reader open and cache it for subsequent calls
which may enhance the performance of reading multiple frames.
Returns:
VideoBackend subclass instance.
"""
if isinstance(filename, Path):
filename = filename.as_posix()
if type(filename) == str and Path(filename).is_dir():
filename = ImageVideo.find_images(filename)
if type(filename) == list:
filename = [Path(f).as_posix() for f in filename]
return ImageVideo(
filename, grayscale=grayscale, **_get_valid_kwargs(ImageVideo, kwargs)
)
elif filename.endswith(ImageVideo.EXTS):
return ImageVideo(
[filename], grayscale=grayscale, **_get_valid_kwargs(ImageVideo, kwargs)
)
elif filename.endswith(MediaVideo.EXTS):
return MediaVideo(
filename,
grayscale=grayscale,
keep_open=keep_open,
**_get_valid_kwargs(MediaVideo, kwargs),
)
elif filename.endswith(HDF5Video.EXTS):
return HDF5Video(
filename,
dataset=dataset,
grayscale=grayscale,
keep_open=keep_open,
**_get_valid_kwargs(HDF5Video, kwargs),
)
else:
raise ValueError(f"Unknown video file type: {filename}")
def _read_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video. Must be implemented in subclasses."""
raise NotImplementedError
def _read_frames(self, frame_inds: list) -> np.ndarray:
"""Read a list of frames from the video."""
return np.stack([self.get_frame(i) for i in frame_inds], axis=0)
def read_test_frame(self) -> np.ndarray:
"""Read a single frame from the video to test for grayscale.
Note:
This reads the frame at index 0. This may not be appropriate if the first
frame is not available in a given backend.
"""
return self._read_frame(0)
def detect_grayscale(self, test_img: np.ndarray | None = None) -> bool:
"""Detect whether the video is grayscale.
This works by reading in a test frame and comparing the first and last channel
for equality. It may fail in cases where, due to compression, the first and
last channels are not exactly the same.
Args:
test_img: Optional test image to use. If not provided, a test image will be
loaded via the `read_test_frame` method.
Returns:
Whether the video is grayscale. This value is also cached in the `grayscale`
attribute of the class.
"""
if test_img is None:
test_img = self.read_test_frame()
is_grayscale = np.array_equal(test_img[..., 0], test_img[..., -1])
self.grayscale = is_grayscale
return is_grayscale
@property
def num_frames(self) -> int:
"""Number of frames in the video. Must be implemented in subclasses."""
raise NotImplementedError
@property
def img_shape(self) -> Tuple[int, int, int]:
"""Shape of a single frame in the video."""
height, width, channels = self.read_test_frame().shape
if self.grayscale is None:
self.detect_grayscale()
if self.grayscale is False:
channels = 3
elif self.grayscale is True:
channels = 1
return int(height), int(width), int(channels)
@property
def shape(self) -> Tuple[int, int, int, int]:
"""Shape of the video as a tuple of `(frames, height, width, channels)`.
On first call, this will defer to `num_frames` and `img_shape` to determine the
full shape. This call may be expensive for some subclasses, so the result is
cached and returned on subsequent calls.
"""
if self._cached_shape is not None:
return self._cached_shape
else:
shape = (self.num_frames,) + self.img_shape
self._cached_shape = shape
return shape
@property
def frames(self) -> int:
"""Number of frames in the video."""
return self.shape[0]
def __len__(self) -> int:
"""Return number of frames in the video."""
return self.shape[0]
def has_frame(self, frame_idx: int) -> bool:
"""Check if a frame index is contained in the video.
Args:
frame_idx: Index of frame to check.
Returns:
`True` if the index is contained in the video, otherwise `False`.
"""
return frame_idx < len(self)
def get_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video.
Args:
frame_idx: Index of frame to read.
Returns:
Frame as a numpy array of shape `(height, width, channels)` where the
`channels` dimension is 1 for grayscale videos and 3 for color videos.
Notes:
If the `grayscale` attribute is set to `True`, the `channels` dimension will
be reduced to 1 if an RGB frame is loaded from the backend.
If the `grayscale` attribute is set to `None`, the `grayscale` attribute
will be automatically set based on the first frame read.
See also: `get_frames`
"""
if not self.has_frame(frame_idx):
raise IndexError(f"Frame index {frame_idx} out of range.")
img = self._read_frame(frame_idx)
if self.grayscale is None:
self.detect_grayscale(img)
if self.grayscale:
img = img[..., [0]]
return img
def get_frames(self, frame_inds: list[int]) -> np.ndarray:
"""Read a list of frames from the video.
Depending on the backend implementation, this may be faster than reading frames
individually using `get_frame`.
Args:
frame_inds: List of frame indices to read.
Returns:
Frames as a numpy array of shape `(frames, height, width, channels)` where
`channels` dimension is 1 for grayscale videos and 3 for color videos.
Notes:
If the `grayscale` attribute is set to `True`, the `channels` dimension will
be reduced to 1 if an RGB frame is loaded from the backend.
If the `grayscale` attribute is set to `None`, the `grayscale` attribute
will be automatically set based on the first frame read.
See also: `get_frame`
"""
imgs = self._read_frames(frame_inds)
if self.grayscale is None:
self.detect_grayscale(imgs[0])
if self.grayscale:
imgs = imgs[..., [0]]
return imgs
def __getitem__(self, ind: int | list[int] | slice) -> np.ndarray:
"""Return a single frame or a list of frames from the video.
Args:
ind: Index or list of indices of frames to read.
Returns:
Frame or frames as a numpy array of shape `(height, width, channels)` if a
scalar index is provided, or `(frames, height, width, channels)` if a list
of indices is provided.
See also: get_frame, get_frames
"""
if np.isscalar(ind):
return self.get_frame(ind)
else:
if type(ind) is slice:
start = (ind.start or 0) % len(self)
stop = ind.stop or len(self)
if stop < 0:
stop = len(self) + stop
step = ind.step or 1
ind = range(start, stop, step)
return self.get_frames(ind)
frames: int
property
¶
Number of frames in the video.
img_shape: Tuple[int, int, int]
property
¶
Shape of a single frame in the video.
num_frames: int
property
¶
Number of frames in the video. Must be implemented in subclasses.
shape: Tuple[int, int, int, int]
property
¶
Shape of the video as a tuple of (frames, height, width, channels)
.
On first call, this will defer to num_frames
and img_shape
to determine the
full shape. This call may be expensive for some subclasses, so the result is
cached and returned on subsequent calls.
__getitem__(ind)
¶
Return a single frame or a list of frames from the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ind
|
int | list[int] | slice
|
Index or list of indices of frames to read. |
required |
Returns:
Type | Description |
---|---|
ndarray
|
Frame or frames as a numpy array of shape |
See also: get_frame, get_frames
Source code in sleap_io/io/video_reading.py
def __getitem__(self, ind: int | list[int] | slice) -> np.ndarray:
"""Return a single frame or a list of frames from the video.
Args:
ind: Index or list of indices of frames to read.
Returns:
Frame or frames as a numpy array of shape `(height, width, channels)` if a
scalar index is provided, or `(frames, height, width, channels)` if a list
of indices is provided.
See also: get_frame, get_frames
"""
if np.isscalar(ind):
return self.get_frame(ind)
else:
if type(ind) is slice:
start = (ind.start or 0) % len(self)
stop = ind.stop or len(self)
if stop < 0:
stop = len(self) + stop
step = ind.step or 1
ind = range(start, stop, step)
return self.get_frames(ind)
__len__()
¶
detect_grayscale(test_img=None)
¶
Detect whether the video is grayscale.
This works by reading in a test frame and comparing the first and last channel for equality. It may fail in cases where, due to compression, the first and last channels are not exactly the same.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
test_img
|
ndarray | None
|
Optional test image to use. If not provided, a test image will be
loaded via the |
None
|
Returns:
Type | Description |
---|---|
bool
|
Whether the video is grayscale. This value is also cached in the |
Source code in sleap_io/io/video_reading.py
def detect_grayscale(self, test_img: np.ndarray | None = None) -> bool:
"""Detect whether the video is grayscale.
This works by reading in a test frame and comparing the first and last channel
for equality. It may fail in cases where, due to compression, the first and
last channels are not exactly the same.
Args:
test_img: Optional test image to use. If not provided, a test image will be
loaded via the `read_test_frame` method.
Returns:
Whether the video is grayscale. This value is also cached in the `grayscale`
attribute of the class.
"""
if test_img is None:
test_img = self.read_test_frame()
is_grayscale = np.array_equal(test_img[..., 0], test_img[..., -1])
self.grayscale = is_grayscale
return is_grayscale
from_filename(filename, dataset=None, grayscale=None, keep_open=True, **kwargs)
classmethod
¶
Create a VideoBackend from a filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename
|
str | list[str]
|
Path to video file(s). |
required |
dataset
|
Optional[str]
|
Name of dataset in HDF5 file. |
None
|
grayscale
|
Optional[bool]
|
Whether to force grayscale. If None, autodetect on first frame load. |
None
|
keep_open
|
bool
|
Whether to keep the video reader open between calls to read frames. If False, will close the reader after each call. If True (the default), it will keep the reader open and cache it for subsequent calls which may enhance the performance of reading multiple frames. |
True
|
Returns:
Type | Description |
---|---|
VideoBackend
|
VideoBackend subclass instance. |
Source code in sleap_io/io/video_reading.py
@classmethod
def from_filename(
cls,
filename: str | list[str],
dataset: Optional[str] = None,
grayscale: Optional[bool] = None,
keep_open: bool = True,
**kwargs,
) -> VideoBackend:
"""Create a VideoBackend from a filename.
Args:
filename: Path to video file(s).
dataset: Name of dataset in HDF5 file.
grayscale: Whether to force grayscale. If None, autodetect on first frame
load.
keep_open: Whether to keep the video reader open between calls to read
frames. If False, will close the reader after each call. If True (the
default), it will keep the reader open and cache it for subsequent calls
which may enhance the performance of reading multiple frames.
Returns:
VideoBackend subclass instance.
"""
if isinstance(filename, Path):
filename = filename.as_posix()
if type(filename) == str and Path(filename).is_dir():
filename = ImageVideo.find_images(filename)
if type(filename) == list:
filename = [Path(f).as_posix() for f in filename]
return ImageVideo(
filename, grayscale=grayscale, **_get_valid_kwargs(ImageVideo, kwargs)
)
elif filename.endswith(ImageVideo.EXTS):
return ImageVideo(
[filename], grayscale=grayscale, **_get_valid_kwargs(ImageVideo, kwargs)
)
elif filename.endswith(MediaVideo.EXTS):
return MediaVideo(
filename,
grayscale=grayscale,
keep_open=keep_open,
**_get_valid_kwargs(MediaVideo, kwargs),
)
elif filename.endswith(HDF5Video.EXTS):
return HDF5Video(
filename,
dataset=dataset,
grayscale=grayscale,
keep_open=keep_open,
**_get_valid_kwargs(HDF5Video, kwargs),
)
else:
raise ValueError(f"Unknown video file type: {filename}")
get_frame(frame_idx)
¶
Read a single frame from the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame_idx
|
int
|
Index of frame to read. |
required |
Returns:
Type | Description |
---|---|
ndarray
|
Frame as a numpy array of shape |
Notes
If the grayscale
attribute is set to True
, the channels
dimension will
be reduced to 1 if an RGB frame is loaded from the backend.
If the grayscale
attribute is set to None
, the grayscale
attribute
will be automatically set based on the first frame read.
See also: get_frames
Source code in sleap_io/io/video_reading.py
def get_frame(self, frame_idx: int) -> np.ndarray:
"""Read a single frame from the video.
Args:
frame_idx: Index of frame to read.
Returns:
Frame as a numpy array of shape `(height, width, channels)` where the
`channels` dimension is 1 for grayscale videos and 3 for color videos.
Notes:
If the `grayscale` attribute is set to `True`, the `channels` dimension will
be reduced to 1 if an RGB frame is loaded from the backend.
If the `grayscale` attribute is set to `None`, the `grayscale` attribute
will be automatically set based on the first frame read.
See also: `get_frames`
"""
if not self.has_frame(frame_idx):
raise IndexError(f"Frame index {frame_idx} out of range.")
img = self._read_frame(frame_idx)
if self.grayscale is None:
self.detect_grayscale(img)
if self.grayscale:
img = img[..., [0]]
return img
get_frames(frame_inds)
¶
Read a list of frames from the video.
Depending on the backend implementation, this may be faster than reading frames
individually using get_frame
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame_inds
|
list[int]
|
List of frame indices to read. |
required |
Returns:
Type | Description |
---|---|
ndarray
|
Frames as a numpy array of shape |
Notes
If the grayscale
attribute is set to True
, the channels
dimension will
be reduced to 1 if an RGB frame is loaded from the backend.
If the grayscale
attribute is set to None
, the grayscale
attribute
will be automatically set based on the first frame read.
See also: get_frame
Source code in sleap_io/io/video_reading.py
def get_frames(self, frame_inds: list[int]) -> np.ndarray:
"""Read a list of frames from the video.
Depending on the backend implementation, this may be faster than reading frames
individually using `get_frame`.
Args:
frame_inds: List of frame indices to read.
Returns:
Frames as a numpy array of shape `(frames, height, width, channels)` where
`channels` dimension is 1 for grayscale videos and 3 for color videos.
Notes:
If the `grayscale` attribute is set to `True`, the `channels` dimension will
be reduced to 1 if an RGB frame is loaded from the backend.
If the `grayscale` attribute is set to `None`, the `grayscale` attribute
will be automatically set based on the first frame read.
See also: `get_frame`
"""
imgs = self._read_frames(frame_inds)
if self.grayscale is None:
self.detect_grayscale(imgs[0])
if self.grayscale:
imgs = imgs[..., [0]]
return imgs
has_frame(frame_idx)
¶
Check if a frame index is contained in the video.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame_idx
|
int
|
Index of frame to check. |
required |
Returns:
Type | Description |
---|---|
bool
|
|
read_test_frame()
¶
Read a single frame from the video to test for grayscale.
Note
This reads the frame at index 0. This may not be appropriate if the first frame is not available in a given backend.