Compare commits

...

2 Commits

Author SHA1 Message Date
Alexander 26fceea6ad feat: highligh up to 30 sec 2025-05-22 14:21:38 +02:00
Alexander 83041a44d1 feat: minor chanes to highlight 2025-05-22 13:15:18 +02:00
3 changed files with 480 additions and 701 deletions

Binary file not shown.

1055
result.csv

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,7 @@ class VideoHighlighter:
join_gap: float = 0.2, join_gap: float = 0.2,
out_dir: Union[str, Path] = "clips", out_dir: Union[str, Path] = "clips",
concat: bool = True, concat: bool = True,
max_duration: float = 30.0,
) -> None: ) -> None:
self.video = Path(video) self.video = Path(video)
self.df = segments_df.copy() self.df = segments_df.copy()
@ -40,20 +41,33 @@ class VideoHighlighter:
self.join_gap = float(join_gap) self.join_gap = float(join_gap)
self.out_dir = Path(out_dir) self.out_dir = Path(out_dir)
self.concat = concat self.concat = concat
self.max_duration = float(max_duration)
if not {"start", "end"} <= set(self.df.columns): # Проверяем обязательные колонки
required_cols = {"start", "end"}
if not required_cols <= set(self.df.columns):
raise ValueError( raise ValueError(
"DataFrame должен содержать колонки 'start' и 'end'") "DataFrame должен содержать колонки 'start' и 'end'")
# Если нет importance_score, создаем с одинаковыми значениями
if "importance_score" not in self.df.columns:
print(
"Колонка 'importance_score' не найдена, использую одинаковые приоритеты")
self.df["importance_score"] = 1.0
self.out_dir.mkdir(parents=True, exist_ok=True) self.out_dir.mkdir(parents=True, exist_ok=True)
# ────────────────────────── public API ───────────────────────── # ────────────────────────── public API ─────────────────────────
def cut(self) -> None: def cut(self) -> None:
intervals = self._prepare_intervals() intervals = self._prepare_intervals()
# Если общая длительность превышает лимит, выбираем самые важные
if self._calculate_total_duration(intervals) > self.max_duration:
intervals = self._select_best_intervals(intervals)
with VideoFileClip(str(self.video)) as video: with VideoFileClip(str(self.video)) as video:
clips = self._make_subclips(video, intervals) clips = self._make_subclips(video, intervals)
if self.concat: if self.concat and clips:
self._save_concat(clips) self._save_concat(clips)
# ────────────────────────── helpers ─────────────────────────── # ────────────────────────── helpers ───────────────────────────
@ -73,51 +87,107 @@ class VideoHighlighter:
return h * 3600 + m * 60 + s return h * 3600 + m * 60 + s
raise TypeError(f"Неподдерживаемый тип времени: {type(x)}") raise TypeError(f"Неподдерживаемый тип времени: {type(x)}")
def _prepare_intervals(self) -> List[Tuple[float, float]]: def _prepare_intervals(self) -> List[Tuple[float, float, float]]:
"""Возвращает список (start, end, importance_score)"""
df = self.df.copy() df = self.df.copy()
df["start"] = df["start"].apply(self._to_seconds) df["start"] = df["start"].apply(self._to_seconds)
df["end"] = df["end"].apply(self._to_seconds) df["end"] = df["end"].apply(self._to_seconds)
raw = list(df.sort_values("start")[ # Проверка валидности интервалов
["start", "end"]].itertuples(index=False, name=None)) invalid = df[df["start"] >= df["end"]]
merged = self._merge_intervals(raw) if not invalid.empty:
return self._add_padding(merged, total_dur=self._get_duration()) raise ValueError("Найдены некорректные интервалы: start >= end")
def _merge_intervals(self, intervals: List[Tuple[float, float]]) -> List[Tuple[float, float]]: # Получить длительность видео один раз
with VideoFileClip(str(self.video)) as v:
total_dur = v.duration
# Создаем список с importance_score
raw = list(df.sort_values("start")[
["start", "end", "importance_score"]].itertuples(index=False, name=None))
merged = self._merge_intervals(raw)
return self._add_padding(merged, total_dur=total_dur)
def _merge_intervals(self, intervals: List[Tuple[float, float, float]]) -> List[Tuple[float, float, float]]:
"""Объединяет перекрывающиеся интервалы, сохраняя максимальный importance_score"""
if not intervals: if not intervals:
return [] return []
intervals.sort()
# Сортируем по времени начала
intervals.sort(key=lambda x: x[0])
out = [list(intervals[0])] out = [list(intervals[0])]
for s, e in intervals[1:]:
last_e = out[-1][1] for s, e, importance in intervals[1:]:
last_s, last_e, last_importance = out[-1]
if s - last_e <= self.join_gap: # перекрытие / стык if s - last_e <= self.join_gap: # перекрытие / стык
# Объединяем интервалы, берем максимальный importance_score
out[-1][1] = max(last_e, e) out[-1][1] = max(last_e, e)
out[-1][2] = max(last_importance, importance)
else: else:
out.append([s, e]) out.append([s, e, importance])
return [tuple(x) for x in out] return [tuple(x) for x in out]
def _add_padding(self, intervals: List[Tuple[float, float]], total_dur: float) -> List[Tuple[float, float]]: def _add_padding(self, intervals: List[Tuple[float, float, float]], total_dur: float) -> List[Tuple[float, float, float]]:
return [(max(0.0, s - self.pad), min(total_dur, e + self.pad)) for s, e in intervals] """Добавляет padding, сохраняя importance_score"""
return [
(max(0.0, s - self.pad), min(total_dur, e + self.pad), importance)
for s, e, importance in intervals
]
def _get_duration(self) -> float: def _calculate_total_duration(self, intervals: List[Tuple[float, float, float]]) -> float:
with VideoFileClip(str(self.video)) as v: """Вычисляет общую длительность всех интервалов"""
return v.duration return sum(e - s for s, e, _ in intervals)
def _make_subclips(self, video: VideoFileClip, intervals: List[Tuple[float, float]]) -> List[VideoFileClip]: def _select_best_intervals(self, intervals: List[Tuple[float, float, float]]) -> List[Tuple[float, float, float]]:
clips: list[VideoFileClip] = [] """Выбирает самые важные интервалы в пределах max_duration"""
for idx, (s, e) in enumerate(intervals, 1): # Сортируем по важности (убывание)
sorted_intervals = sorted(intervals, key=lambda x: x[2], reverse=True)
selected = []
total_duration = 0.0
print(f"Ограничиваю длительность до {
self.max_duration} сек, выбираю самые важные сегменты:")
for s, e, importance in sorted_intervals:
segment_duration = e - s
if total_duration + segment_duration <= self.max_duration:
selected.append((s, e, importance))
total_duration += segment_duration
print(f" ✓ Сегмент {
s:.1f}-{e:.1f}с (важность: {importance:.2f}, длительность: {segment_duration:.1f}с)")
else:
print(f" ✗ Пропускаю {
s:.1f}-{e:.1f}с (важность: {importance:.2f}) - превышен лимит")
# Сортируем выбранные по времени для корректного воспроизведения
selected.sort(key=lambda x: x[0])
final_duration = sum(e - s for s, e, _ in selected)
print(f"Итоговая длительность: {final_duration:.1f}с из {
self.max_duration}с доступных")
return selected
def _make_subclips(self, video: VideoFileClip, intervals: List[Tuple[float, float, float]]) -> List[VideoFileClip]:
clips: List[VideoFileClip] = []
for idx, (s, e, importance) in enumerate(intervals, 1):
clip = video.subclipped(s, e) clip = video.subclipped(s, e)
if not self.concat: if not self.concat:
fname = self.out_dir / f"clip_{idx:02d}.mp4" fname = self.out_dir / \
f"clip_{idx:02d}_score_{importance:.2f}.mp4"
self._save_clip(clip, fname) self._save_clip(clip, fname)
clip.close() clip.close()
else:
clips.append(clip) clips.append(clip)
return clips return clips
def _save_clip(self, clip: VideoFileClip, fname: Path) -> None: def _save_clip(self, clip: VideoFileClip, fname: Path) -> None:
print(f"Сохраняю {fname.name:>12}: {clip.duration:6.2f} сек") print(f"Сохраняю {fname.name:>25}: {clip.duration:6.2f} сек")
clip.write_videofile( clip.write_videofile(
fname.as_posix(), str(fname),
codec="libx264", codec="libx264",
audio_codec="aac", audio_codec="aac",
temp_audiofile=str(fname.with_suffix('.m4a')), temp_audiofile=str(fname.with_suffix('.m4a')),
@ -126,11 +196,15 @@ class VideoHighlighter:
) )
def _save_concat(self, clips: List[VideoFileClip]) -> None: def _save_concat(self, clips: List[VideoFileClip]) -> None:
if not clips:
print("Нет клипов для объединения")
return
final = concatenate_videoclips(clips, method="compose") final = concatenate_videoclips(clips, method="compose")
outfile = self.out_dir / "highlights.mp4" outfile = self.out_dir / "highlights.mp4"
print(f"Сохраняю дайджест {outfile.name}: {final.duration:6.2f} сек") print(f"Сохраняю дайджест {outfile.name}: {final.duration:6.2f} сек")
final.write_videofile( final.write_videofile(
outfile.as_posix(), str(outfile),
codec="libx264", codec="libx264",
audio_codec="aac", audio_codec="aac",
temp_audiofile=str(self.out_dir / "temp_audio.m4a"), temp_audiofile=str(self.out_dir / "temp_audio.m4a"),