segmentor.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import os
  2. import random
  3. from typing import List, Tuple
  4. import matplotlib.pyplot as plt
  5. import numpy as np
  6. import torch
  7. from PIL.Image import Image
  8. from matplotlib.pyplot import figure, imshow, axis
  9. from pytube import YouTube
  10. from torch import nn
  11. from pipeline import BuildDataset
  12. # images constituting a segments and the length in seconds
  13. Segment = Tuple[List[Image], int]
  14. class Segmentor:
  15. def __init__(self,
  16. model: nn.Module,
  17. min_frames: int,
  18. threshold: float):
  19. self.model = model
  20. self.min_frames = min_frames
  21. self.threshold = threshold
  22. @staticmethod
  23. def _segmentor(preds: List[int],
  24. min_frames: int,
  25. threshold: float) -> List[List[int]]:
  26. candidates = []
  27. n = len(preds)
  28. for idx_start in range(n):
  29. if preds[idx_start] == 1:
  30. if n - idx_start >= min_frames:
  31. best_here = (-1, (-1, -1))
  32. for idx_end in range(idx_start + min_frames - 1, len(preds)):
  33. if preds[idx_end] == 1:
  34. if np.mean(preds[idx_start:idx_end + 1]) >= threshold:
  35. frames = idx_end - idx_start + 1
  36. endpoints = (idx_start, idx_end)
  37. if frames > best_here[0]:
  38. best_here = (frames, endpoints)
  39. if best_here[0] > 0:
  40. candidates.append(best_here[1])
  41. overlap = True
  42. while overlap:
  43. overlap = False
  44. for i in range(len(candidates)):
  45. ref_idx_start, ref_idx_end = candidates[i]
  46. for j in range(i + 1, len(candidates)):
  47. comp_idx_start, comp_idx_end = candidates[j]
  48. if ref_idx_start <= comp_idx_end <= ref_idx_end or ref_idx_start <= comp_idx_start <= ref_idx_end:
  49. # overlapping, take the longer one
  50. if comp_idx_end - comp_idx_end > ref_idx_end - ref_idx_start:
  51. del candidates[i]
  52. else:
  53. del candidates[j]
  54. overlap = True
  55. if overlap:
  56. break
  57. if overlap:
  58. break
  59. return [list(range(idx_start, idx_end + 1)) for idx_start, idx_end in candidates]
  60. @staticmethod
  61. def _torch_img_to_pil(img: torch.Tensor) -> Image:
  62. return BuildDataset.transform_reverse(img)
  63. @staticmethod
  64. def _get_segment_len(indices: List[int]):
  65. return max(indices) - min(indices) + 1
  66. def segmentor(self, preds: List[int], images: List[torch.Tensor]) -> List[Segment]:
  67. segment_list = self._segmentor(preds, self.min_frames, self.threshold)
  68. return [
  69. ([self._torch_img_to_pil(images[idx])
  70. for idx in segment_idx], self._get_segment_len(segment_idx))
  71. for segment_idx in segment_list]
  72. def _predict(self, audio: torch.Tensor, image: torch.Tensor) -> int:
  73. return int(torch.max(self.model(audio.unsqueeze(0), image.unsqueeze(0)), 1)[1][0])
  74. def get_segments(self, path_video: str) -> List[Segment]:
  75. audio, images = BuildDataset.one_video_extract_audio_and_stills(path_video)
  76. preds = [self._predict(audio[idx], images[idx]) for idx in range(len(images))]
  77. return self.segmentor(preds, images)
  78. @staticmethod
  79. def show_images_horizontally(images: List[Image]) -> None:
  80. # https://stackoverflow.com/questions/36006136/how-to-display-images-in-a-row-with-ipython-display
  81. fig = figure(figsize=(20, 20))
  82. number_of_files = len(images)
  83. for i in range(number_of_files):
  84. a = fig.add_subplot(1, number_of_files, i + 1)
  85. image = images[i]
  86. imshow(image)
  87. axis('off')
  88. plt.show()
  89. def visualize_segments(self, path_video: str, n_to_show: int = 10) -> None:
  90. segments = self.get_segments(path_video)
  91. n_segments = len(segments)
  92. print(f'Found {len(segments)} segments')
  93. if n_segments > 0:
  94. for i, (segment_images, segment_len) in enumerate(segments):
  95. print(f'Segment {i + 1}, {segment_len} seconds')
  96. print(f'First {n_to_show}')
  97. self.show_images_horizontally(segment_images[:n_to_show])
  98. print(f'{n_to_show} random shots')
  99. self.show_images_horizontally(random.sample(segment_images, n_to_show))
  100. print('Last 10')
  101. self.show_images_horizontally(segment_images[-n_to_show:])
  102. print('=' * 10)
  103. @staticmethod
  104. def _download_youtube_video(youtube_id: str, show_title: bool = True) -> str:
  105. yt = YouTube(f'http://youtube.com/watch?v={youtube_id}')
  106. if show_title:
  107. print(f'Title: {yt.title}')
  108. yt_stream = yt.streams.first()
  109. path = f'{yt_stream.default_filename}'
  110. yt_stream.download()
  111. return path
  112. def visualize_segments_youtube(self,
  113. youtube_id: str,
  114. n_to_show: int = 10,
  115. show_title: bool = True,
  116. remove_file: bool = True):
  117. path = self._download_youtube_video(youtube_id, show_title)
  118. self.visualize_segments(path, n_to_show)
  119. if remove_file:
  120. os.remove(path)