recap_gen/video_editor/highlight.py

143 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# highlight_cutter.py
"""
Класс VideoHighlighter
======================
Нарезает «важные» отрезки видео по DataFrame со столбцами
`start` и `end` (секунды). Умеет:
• объединять соседние / перекрывающиеся интервалы (join_gap)
• добавлять «окно» pad слева и справа
• сохранять клипы отдельно или склеивать в единый highlights.mp4
"""
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple, Union
import datetime as dt
import pandas as pd
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.compositing.CompositeVideoClip import concatenate_videoclips
class VideoHighlighter:
"""Высвечивает (вырезает) нужные фрагменты ролика."""
# ─────────────────────────── init ────────────────────────────
def __init__(
self,
video: Union[str, Path],
segments_df: pd.DataFrame,
pad: float = 1.0,
join_gap: float = 0.2,
out_dir: Union[str, Path] = "clips",
concat: bool = True,
) -> None:
self.video = Path(video)
self.df = segments_df.copy()
self.pad = float(pad)
self.join_gap = float(join_gap)
self.out_dir = Path(out_dir)
self.concat = concat
if not {"start", "end"} <= set(self.df.columns):
raise ValueError(
"DataFrame должен содержать колонки 'start' и 'end'")
self.out_dir.mkdir(parents=True, exist_ok=True)
# ────────────────────────── public API ─────────────────────────
def cut(self) -> None:
intervals = self._prepare_intervals()
with VideoFileClip(str(self.video)) as video:
clips = self._make_subclips(video, intervals)
if self.concat:
self._save_concat(clips)
# ────────────────────────── helpers ───────────────────────────
def _to_seconds(self, x) -> float:
"""Любое представление времени → float секунд."""
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, dt.time):
return x.hour * 3600 + x.minute * 60 + x.second + x.microsecond / 1_000_000
if isinstance(x, str):
# принимает форматы HH:MM:SS, HH:MM:SS.mmm или просто число
parts = x.replace(",", ".").split(":")
if len(parts) == 1:
return float(parts[0])
h, m, s = map(float, parts) if len(
parts) == 3 else (0, *map(float, parts))
return h * 3600 + m * 60 + s
raise TypeError(f"Неподдерживаемый тип времени: {type(x)}")
def _prepare_intervals(self) -> List[Tuple[float, float]]:
df = self.df.copy()
df["start"] = df["start"].apply(self._to_seconds)
df["end"] = df["end"].apply(self._to_seconds)
raw = list(df.sort_values("start")[
["start", "end"]].itertuples(index=False, name=None))
merged = self._merge_intervals(raw)
return self._add_padding(merged, total_dur=self._get_duration())
def _merge_intervals(self, intervals: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
if not intervals:
return []
intervals.sort()
out = [list(intervals[0])]
for s, e in intervals[1:]:
last_e = out[-1][1]
if s - last_e <= self.join_gap: # перекрытие / стык
out[-1][1] = max(last_e, e)
else:
out.append([s, e])
return [tuple(x) for x in out]
def _add_padding(self, intervals: List[Tuple[float, float]], total_dur: float) -> List[Tuple[float, float]]:
return [(max(0.0, s - self.pad), min(total_dur, e + self.pad)) for s, e in intervals]
def _get_duration(self) -> float:
with VideoFileClip(str(self.video)) as v:
return v.duration
def _make_subclips(self, video: VideoFileClip, intervals: List[Tuple[float, float]]) -> List[VideoFileClip]:
clips: list[VideoFileClip] = []
for idx, (s, e) in enumerate(intervals, 1):
clip = video.subclipped(s, e)
if not self.concat:
fname = self.out_dir / f"clip_{idx:02d}.mp4"
self._save_clip(clip, fname)
clip.close()
clips.append(clip)
return clips
def _save_clip(self, clip: VideoFileClip, fname: Path) -> None:
print(f"Сохраняю {fname.name:>12}: {clip.duration:6.2f} сек")
clip.write_videofile(
fname.as_posix(),
codec="libx264",
audio_codec="aac",
temp_audiofile=str(fname.with_suffix('.m4a')),
remove_temp=True,
logger=None,
)
def _save_concat(self, clips: List[VideoFileClip]) -> None:
final = concatenate_videoclips(clips, method="compose")
outfile = self.out_dir / "highlights.mp4"
print(f"Сохраняю дайджест {outfile.name}: {final.duration:6.2f} сек")
final.write_videofile(
outfile.as_posix(),
codec="libx264",
audio_codec="aac",
temp_audiofile=str(self.out_dir / "temp_audio.m4a"),
remove_temp=True,
logger=None,
)
final.close()
for c in clips:
c.close()