154 lines
6.2 KiB
Python
154 lines
6.2 KiB
Python
# highlight_cutter.py
|
||
"""
|
||
Класс VideoHighlighter
|
||
======================
|
||
|
||
Нарезает «важные» отрезки видео по DataFrame со столбцами
|
||
`start` и `end` (секунды). Умеет:
|
||
|
||
• объединять соседние / перекрывающиеся интервалы (join_gap)
|
||
• добавлять «окно» pad слева и справа
|
||
• сохранять клипы отдельно или склеивать в единый highlights.mp4
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
from pathlib import Path
|
||
from typing import List, Tuple, Union
|
||
import datetime as dt
|
||
|
||
import pandas as pd
|
||
from moviepy.video.io.VideoFileClip import VideoFileClip
|
||
from moviepy.video.compositing.CompositeVideoClip import concatenate_videoclips
|
||
|
||
|
||
class VideoHighlighter:
|
||
"""Высвечивает (вырезает) нужные фрагменты ролика."""
|
||
|
||
# ─────────────────────────── init ────────────────────────────
|
||
def __init__(
|
||
self,
|
||
video: Union[str, Path],
|
||
segments_df: pd.DataFrame,
|
||
pad: float = 1.0,
|
||
join_gap: float = 0.2,
|
||
out_dir: Union[str, Path] = "clips",
|
||
concat: bool = True,
|
||
) -> None:
|
||
self.video = Path(video)
|
||
self.df = segments_df.copy()
|
||
self.pad = float(pad)
|
||
self.join_gap = float(join_gap)
|
||
self.out_dir = Path(out_dir)
|
||
self.concat = concat
|
||
|
||
if not {"start", "end"} <= set(self.df.columns):
|
||
raise ValueError(
|
||
"DataFrame должен содержать колонки 'start' и 'end'")
|
||
|
||
self.out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# ────────────────────────── public API ─────────────────────────
|
||
def cut(self) -> None:
|
||
intervals = self._prepare_intervals()
|
||
|
||
with VideoFileClip(str(self.video)) as video:
|
||
clips = self._make_subclips(video, intervals)
|
||
if self.concat:
|
||
self._save_concat(clips)
|
||
|
||
# ────────────────────────── helpers ───────────────────────────
|
||
def _to_seconds(self, x) -> float:
|
||
"""Любое представление времени → float секунд."""
|
||
if isinstance(x, (int, float)):
|
||
return float(x)
|
||
if isinstance(x, dt.time):
|
||
return x.hour * 3600 + x.minute * 60 + x.second + x.microsecond / 1_000_000
|
||
if isinstance(x, str):
|
||
# принимает форматы HH:MM:SS, HH:MM:SS.mmm или просто число
|
||
parts = x.replace(",", ".").split(":")
|
||
if len(parts) == 1:
|
||
return float(parts[0])
|
||
h, m, s = map(float, parts) if len(
|
||
parts) == 3 else (0, *map(float, parts))
|
||
return h * 3600 + m * 60 + s
|
||
raise TypeError(f"Неподдерживаемый тип времени: {type(x)}")
|
||
|
||
def _prepare_intervals(self) -> List[Tuple[float, float]]:
|
||
df = self.df.copy()
|
||
df["start"] = df["start"].apply(self._to_seconds)
|
||
df["end"] = df["end"].apply(self._to_seconds)
|
||
|
||
# Добавить проверку валидности
|
||
invalid = df[df["start"] >= df["end"]]
|
||
if not invalid.empty:
|
||
raise ValueError("Найдены некорректные интервалы: start >= end")
|
||
|
||
raw = list(df.sort_values("start")[
|
||
["start", "end"]].itertuples(index=False, name=None))
|
||
merged = self._merge_intervals(raw)
|
||
|
||
# Получить длительность один раз
|
||
with VideoFileClip(str(self.video)) as v:
|
||
total_dur = v.duration
|
||
|
||
return self._add_padding(merged, total_dur=total_dur)
|
||
|
||
def _merge_intervals(self, intervals: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
|
||
if not intervals:
|
||
return []
|
||
intervals.sort()
|
||
out = [list(intervals[0])]
|
||
for s, e in intervals[1:]:
|
||
last_e = out[-1][1]
|
||
if s - last_e <= self.join_gap: # перекрытие / стык
|
||
out[-1][1] = max(last_e, e)
|
||
else:
|
||
out.append([s, e])
|
||
return [tuple(x) for x in out]
|
||
|
||
def _add_padding(self, intervals: List[Tuple[float, float]], total_dur: float) -> List[Tuple[float, float]]:
|
||
return [(max(0.0, s - self.pad), min(total_dur, e + self.pad)) for s, e in intervals]
|
||
|
||
def _get_duration(self) -> float:
|
||
with VideoFileClip(str(self.video)) as v:
|
||
return v.duration
|
||
|
||
def _make_subclips(self, video: VideoFileClip, intervals: List[Tuple[float, float]]) -> List[VideoFileClip]:
|
||
clips: list[VideoFileClip] = []
|
||
for idx, (s, e) in enumerate(intervals, 1):
|
||
clip = video.subclipped(s, e)
|
||
if not self.concat:
|
||
fname = self.out_dir / f"clip_{idx:02d}.mp4"
|
||
self._save_clip(clip, fname)
|
||
clip.close()
|
||
else:
|
||
clips.append(clip) # Добавляем только если concat=True
|
||
return clips
|
||
|
||
def _save_clip(self, clip: VideoFileClip, fname: Path) -> None:
|
||
print(f"Сохраняю {fname.name:>12}: {clip.duration:6.2f} сек")
|
||
clip.write_videofile(
|
||
fname.as_posix(),
|
||
codec="libx264",
|
||
audio_codec="aac",
|
||
temp_audiofile=str(fname.with_suffix('.m4a')),
|
||
remove_temp=True,
|
||
logger=None,
|
||
)
|
||
|
||
def _save_concat(self, clips: List[VideoFileClip]) -> None:
|
||
final = concatenate_videoclips(clips, method="compose")
|
||
outfile = self.out_dir / "highlights.mp4"
|
||
print(f"Сохраняю дайджест {outfile.name}: {final.duration:6.2f} сек")
|
||
final.write_videofile(
|
||
outfile.as_posix(),
|
||
codec="libx264",
|
||
audio_codec="aac",
|
||
temp_audiofile=str(self.out_dir / "temp_audio.m4a"),
|
||
remove_temp=True,
|
||
logger=None,
|
||
)
|
||
final.close()
|
||
for c in clips:
|
||
c.close()
|