recap_gen/video_editor/highlight.py

217 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# highlight_cutter.py
"""
Класс VideoHighlighter
======================
Нарезает «важные» отрезки видео по DataFrame со столбцами
`start` и `end` (секунды). Умеет:
• объединять соседние / перекрывающиеся интервалы (join_gap)
• добавлять «окно» pad слева и справа
• сохранять клипы отдельно или склеивать в единый highlights.mp4
"""
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple, Union
import datetime as dt
import pandas as pd
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.compositing.CompositeVideoClip import concatenate_videoclips
class VideoHighlighter:
"""Высвечивает (вырезает) нужные фрагменты ролика."""
# ─────────────────────────── init ────────────────────────────
def __init__(
self,
video: Union[str, Path],
segments_df: pd.DataFrame,
pad: float = 1.0,
join_gap: float = 0.2,
out_dir: Union[str, Path] = "clips",
concat: bool = True,
max_duration: float = 30.0,
) -> None:
self.video = Path(video)
self.df = segments_df.copy()
self.pad = float(pad)
self.join_gap = float(join_gap)
self.out_dir = Path(out_dir)
self.concat = concat
self.max_duration = float(max_duration)
# Проверяем обязательные колонки
required_cols = {"start", "end"}
if not required_cols <= set(self.df.columns):
raise ValueError(
"DataFrame должен содержать колонки 'start' и 'end'")
# Если нет importance_score, создаем с одинаковыми значениями
if "importance_score" not in self.df.columns:
print(
"Колонка 'importance_score' не найдена, использую одинаковые приоритеты")
self.df["importance_score"] = 1.0
self.out_dir.mkdir(parents=True, exist_ok=True)
# ────────────────────────── public API ─────────────────────────
def cut(self) -> None:
intervals = self._prepare_intervals()
# Если общая длительность превышает лимит, выбираем самые важные
if self._calculate_total_duration(intervals) > self.max_duration:
intervals = self._select_best_intervals(intervals)
with VideoFileClip(str(self.video)) as video:
clips = self._make_subclips(video, intervals)
if self.concat and clips:
self._save_concat(clips)
# ────────────────────────── helpers ───────────────────────────
def _to_seconds(self, x) -> float:
"""Любое представление времени → float секунд."""
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, dt.time):
return x.hour * 3600 + x.minute * 60 + x.second + x.microsecond / 1_000_000
if isinstance(x, str):
# принимает форматы HH:MM:SS, HH:MM:SS.mmm или просто число
parts = x.replace(",", ".").split(":")
if len(parts) == 1:
return float(parts[0])
h, m, s = map(float, parts) if len(
parts) == 3 else (0, *map(float, parts))
return h * 3600 + m * 60 + s
raise TypeError(f"Неподдерживаемый тип времени: {type(x)}")
def _prepare_intervals(self) -> List[Tuple[float, float, float]]:
"""Возвращает список (start, end, importance_score)"""
df = self.df.copy()
df["start"] = df["start"].apply(self._to_seconds)
df["end"] = df["end"].apply(self._to_seconds)
# Проверка валидности интервалов
invalid = df[df["start"] >= df["end"]]
if not invalid.empty:
raise ValueError("Найдены некорректные интервалы: start >= end")
# Получить длительность видео один раз
with VideoFileClip(str(self.video)) as v:
total_dur = v.duration
# Создаем список с importance_score
raw = list(df.sort_values("start")[
["start", "end", "importance_score"]].itertuples(index=False, name=None))
merged = self._merge_intervals(raw)
return self._add_padding(merged, total_dur=total_dur)
def _merge_intervals(self, intervals: List[Tuple[float, float, float]]) -> List[Tuple[float, float, float]]:
"""Объединяет перекрывающиеся интервалы, сохраняя максимальный importance_score"""
if not intervals:
return []
# Сортируем по времени начала
intervals.sort(key=lambda x: x[0])
out = [list(intervals[0])]
for s, e, importance in intervals[1:]:
last_s, last_e, last_importance = out[-1]
if s - last_e <= self.join_gap: # перекрытие / стык
# Объединяем интервалы, берем максимальный importance_score
out[-1][1] = max(last_e, e)
out[-1][2] = max(last_importance, importance)
else:
out.append([s, e, importance])
return [tuple(x) for x in out]
def _add_padding(self, intervals: List[Tuple[float, float, float]], total_dur: float) -> List[Tuple[float, float, float]]:
"""Добавляет padding, сохраняя importance_score"""
return [
(max(0.0, s - self.pad), min(total_dur, e + self.pad), importance)
for s, e, importance in intervals
]
def _calculate_total_duration(self, intervals: List[Tuple[float, float, float]]) -> float:
"""Вычисляет общую длительность всех интервалов"""
return sum(e - s for s, e, _ in intervals)
def _select_best_intervals(self, intervals: List[Tuple[float, float, float]]) -> List[Tuple[float, float, float]]:
"""Выбирает самые важные интервалы в пределах max_duration"""
# Сортируем по важности (убывание)
sorted_intervals = sorted(intervals, key=lambda x: x[2], reverse=True)
selected = []
total_duration = 0.0
print(f"Ограничиваю длительность до {
self.max_duration} сек, выбираю самые важные сегменты:")
for s, e, importance in sorted_intervals:
segment_duration = e - s
if total_duration + segment_duration <= self.max_duration:
selected.append((s, e, importance))
total_duration += segment_duration
print(f" ✓ Сегмент {
s:.1f}-{e:.1f}с (важность: {importance:.2f}, длительность: {segment_duration:.1f}с)")
else:
print(f" ✗ Пропускаю {
s:.1f}-{e:.1f}с (важность: {importance:.2f}) - превышен лимит")
# Сортируем выбранные по времени для корректного воспроизведения
selected.sort(key=lambda x: x[0])
final_duration = sum(e - s for s, e, _ in selected)
print(f"Итоговая длительность: {final_duration:.1f}с из {
self.max_duration}с доступных")
return selected
def _make_subclips(self, video: VideoFileClip, intervals: List[Tuple[float, float, float]]) -> List[VideoFileClip]:
clips: List[VideoFileClip] = []
for idx, (s, e, importance) in enumerate(intervals, 1):
clip = video.subclipped(s, e)
if not self.concat:
fname = self.out_dir / \
f"clip_{idx:02d}_score_{importance:.2f}.mp4"
self._save_clip(clip, fname)
clip.close()
else:
clips.append(clip)
return clips
def _save_clip(self, clip: VideoFileClip, fname: Path) -> None:
print(f"Сохраняю {fname.name:>25}: {clip.duration:6.2f} сек")
clip.write_videofile(
str(fname),
codec="libx264",
audio_codec="aac",
temp_audiofile=str(fname.with_suffix('.m4a')),
remove_temp=True,
logger=None,
)
def _save_concat(self, clips: List[VideoFileClip]) -> None:
if not clips:
print("Нет клипов для объединения")
return
final = concatenate_videoclips(clips, method="compose")
outfile = self.out_dir / "highlights.mp4"
print(f"Сохраняю дайджест {outfile.name}: {final.duration:6.2f} сек")
final.write_videofile(
str(outfile),
codec="libx264",
audio_codec="aac",
temp_audiofile=str(self.out_dir / "temp_audio.m4a"),
remove_temp=True,
logger=None,
)
final.close()
for c in clips:
c.close()