class VideoReader:
def __init__(self, source_path, step=1, start=0, stop=None, dimension=DimensionType.DIM_2D):
super().__init__(
source_path=source_path,
step=step,
start=start,
stop=stop + 1 if stop is not None else stop,
dimension=dimension,
)
def _has_frame(self, i):
if i >= self._start:
if (i - self._start) % self._step == 0:
if self._stop is None or i < self._stop:
return True
return False
def _decode(self, container):
frame_num = 0
for packet in container.demux():
if packet.stream.type == 'video':
for image in packet.decode():
frame_num += 1
if self._has_frame(frame_num - 1):
if packet.stream.metadata.get('rotate'):
old_image = image
image = av.Videoframe().from_ndarray(
rotate_image(
image.to_ndarray(format='bgr24'),
360 - int(container.streams.video[0].metadata.get('rotate'))
),
format ='bgr24'
)
image.pts = old_image.pts
yield (image, self._source_path[0], image.pts)
def __iter__(self):
container = self._get_av_container()
source_video_stream = container.streams.video[0]
source_video_stream.thread_type = 'AUTO'
return self._decode(container)
def get_progress(self, pos):
container = self._get_av_container()
# Not for all containers return real value
stream = container.streams.video[0]
return pos / stream.duration if stream.duration else None
def _get_av_container(self):
if isinstance(self._source_path[0], io.BytesIO):
self._source_path[0].seek(0) # required for re-reading
return av.open(self._source_path[0])
def get_preview(self):
container = self._get_av_container()
stream = container.streams.video[0]
preview = next(container.decode(stream))
return self._get_preview(preview.to_image() if not stream.metadata.get('rotate')
else av.Videoframe().from_ndarray(
rotate_image(
preview.to_ndarray(format='bgr24'),
360 - int(container.streams.video[0].metadata.get('rotate'))
),
format ='bgr24'
).to_image()
)
def get_image_size(self, i):
image = (next(iter(self)))[0]
return image.width, image.height
官网PyAV documentation — PyAV 8.0.1 documentation



