Source code for subaligner.media_helper

import subprocess
import os
import threading
import traceback
import tempfile
import shutil
import atexit
import signal
import shlex

from typing import Optional, Tuple, List
from copy import deepcopy
from pysrt import SubRipFile, SubRipItem
from decimal import Decimal
from .embedder import FeatureEmbedder
from .exception import TerminalException
from .exception import NoFrameRateException
from .logger import Logger
from .utils import Utils

TEMP_DIR_PATH = tempfile.mkdtemp()


[docs]def clear_temp(*_): if os.path.isdir(TEMP_DIR_PATH): shutil.rmtree(TEMP_DIR_PATH)
[docs]class MediaHelper(object): """ Utility for processing media assets including audio, video and subtitle files. """ FFMPEG_BIN = os.getenv("FFMPEG_PATH") or os.getenv("ffmpeg_path") or "ffmpeg" AUDIO_FILE_EXTENSION = [".wav", ".aac"] __MIN_SECS_PER_WORD = 0.414 # 60 secs / 145 wpm __MIN_GAP_IN_SECS = ( 1 # minimum gap in seconds between consecutive subtitle during segmentation ) __CMD_TIME_OUT = 180 # time out for subprocess atexit.register(clear_temp) signal.signal(signal.SIGTERM, clear_temp) def __init__(self): self.__LOGGER = Logger().get_logger(__name__)
[docs] def extract_audio(self, video_file_path, decompress: bool = False, freq: int = 16000) -> str: """Extract audio track from the video file and save it to a WAV file. Arguments: video_file_path {string} -- The input video file path. Keyword Arguments: decompress {bool} -- Extract WAV if True otherwise extract AAC (default: {False}). freq {int} -- The audio sample frequency (default: {16000}). Returns: string -- The file path of the extracted audio. """ basename = os.path.basename(video_file_path) # Using WAV for training or prediction is faster than using AAC. # However the former will result in larger temporary audio files saved on the disk. if decompress: assert freq is not None, "Frequency is needed for decompression" audio_file_path = os.path.join( TEMP_DIR_PATH, f"{basename}{self.AUDIO_FILE_EXTENSION[0]}" ) else: audio_file_path = os.path.join( TEMP_DIR_PATH, f"{basename}{self.AUDIO_FILE_EXTENSION[1]}" ) command = ( "{0} -y -xerror -i {1} -ac 2 -ar {2} -vn {3}".format( self.FFMPEG_BIN, Utils.double_quoted(video_file_path), freq, Utils.double_quoted(audio_file_path) ) if decompress else "{0} -y -xerror -i {1} -vn -acodec copy {2}".format( self.FFMPEG_BIN, Utils.double_quoted(video_file_path), Utils.double_quoted(audio_file_path) ) ) print(command) with subprocess.Popen( shlex.split(command), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True, bufsize=1, ) as process: try: self.__LOGGER.debug("[{}] Running: {}".format(process.pid, command)) _, std_err = process.communicate(timeout=self.__CMD_TIME_OUT) self.__LOGGER.debug("[{}] {}".format(process.pid, std_err)) if process.returncode != 0: self.__LOGGER.error("[{}] Cannot extract audio from video: {}\n{}" .format(process.pid, video_file_path, std_err)) raise TerminalException( "Cannot extract audio from video: {}".format(video_file_path) ) self.__LOGGER.info( "[{}] Extracted audio file: {}".format(process.pid, audio_file_path)) return audio_file_path except subprocess.TimeoutExpired as te: self.__LOGGER.error("Timeout on extracting audio from video: {}".format(video_file_path)) if os.path.exists(audio_file_path): os.remove(audio_file_path) raise TerminalException( "Timeout on extracting audio from video: {}".format(video_file_path) ) from te except Exception as e: if os.path.exists(audio_file_path): os.remove(audio_file_path) if isinstance(e, TerminalException): raise e else: raise TerminalException( "Cannot extract audio from video: {}".format(video_file_path) ) from e except KeyboardInterrupt: self.__LOGGER.error( "[{}] Extracting audio from video {} interrupted".format( process.pid, video_file_path ) ) if os.path.exists(audio_file_path): os.remove(audio_file_path) process.send_signal(signal.SIGINT) raise TerminalException( "Extracting audio from video {} interrupted".format(video_file_path) ) finally: process.kill() os.system("stty sane")
[docs] def get_duration_in_seconds(self, start: Optional[str], end: Optional[str]) -> Optional[float]: """Get the duration in seconds between a start time and an end time. Arguments: start {string} -- The start time (e.g., 00:00:00,750). end {string} -- The end time (e.g., 00:00:10,230). Returns: float -- The duration in seconds. """ if start is None: start = "00:00:00,000" if end is None: return None start = start.replace(",", ".") end = end.replace(",", ".") start_h, start_m, start_s = map(Decimal, start.split(":")) end_h, end_m, end_s = map(Decimal, end.split(":")) return float( (end_h * 3600 + end_m * 60 + end_s) - (start_h * 3600 + start_m * 60 + start_s) )
[docs] def extract_audio_from_start_to_end(self, audio_file_path: str, start: str, end: Optional[str] = None) -> Tuple[str, Optional[float]]: """Extract audio based on the start time and the end time and save it to a temporary file. Arguments: audio_file_path {string} -- The path of the audio file. start {string} -- The start time (e.g., 00:00:00,750). Keyword Arguments: end {string} -- The end time (e.g., 00:00:10,230) (default: {None}). Returns: tuple -- The file path to the extracted audio and its duration. """ segment_duration = self.get_duration_in_seconds(start, end) basename = os.path.basename(audio_file_path) filename, extension = os.path.splitext(basename) start = start.replace(",", ".") if end is not None: end = end.replace(",", ".") segment_path = os.path.join(TEMP_DIR_PATH, f"{filename}_{str(start)}_{str(end)}{extension}") if end is not None: duration = self.get_duration_in_seconds(start, end) command = "{0} -y -xerror -i {1} -ss {2} -t {3} -acodec copy {4}".format( self.FFMPEG_BIN, Utils.double_quoted(audio_file_path), start, duration, Utils.double_quoted(segment_path) ) else: command = "{0} -y -xerror -i {1} -ss {2} -acodec copy {3}".format( self.FFMPEG_BIN, Utils.double_quoted(audio_file_path), start, Utils.double_quoted(segment_path) ) with subprocess.Popen( shlex.split(command), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, bufsize=1, ) as process: self.__LOGGER.debug("[{}] Running: {}".format(process.pid, command)) try: _, std_err = process.communicate(timeout=self.__CMD_TIME_OUT) self.__LOGGER.debug("[{}] {}".format(process.pid, std_err)) if process.returncode != 0: self.__LOGGER.error("[{}] Cannot clip audio: {} Return Code: {}\n{}" .format(process.pid, audio_file_path, process.returncode, std_err)) raise TerminalException( "Cannot clip audio: {} Return Code: {}".format(audio_file_path, process.returncode) ) self.__LOGGER.info( "[{}] Extracted audio segment: {}".format(process.pid, segment_path)) return segment_path, segment_duration except subprocess.TimeoutExpired as e: self.__LOGGER.error( "[{}] Extracting {} timed out: {}\n{}".format( process.pid, segment_path, str(e), "\n".join(traceback.format_stack()) ) ) traceback.print_tb(e.__traceback__) if os.path.exists(segment_path): os.remove(segment_path) raise TerminalException( "Timeout on extracting audio from audio: {} after {} seconds".format(audio_file_path, self.__CMD_TIME_OUT) ) from e except Exception as e: self.__LOGGER.error( "[{}] Extracting {} failed: {}\n{}".format( process.pid, segment_path, str(e), "\n".join(traceback.format_stack()) ) ) traceback.print_tb(e.__traceback__) if os.path.exists(segment_path): os.remove(segment_path) if isinstance(e, TerminalException): raise e else: raise TerminalException( "Cannot clip audio: {}".format(audio_file_path) ) from e except KeyboardInterrupt: self.__LOGGER.error( "[{}] Extracting with start and end from {} interrupted".format( process.pid, segment_path ) ) if os.path.exists(segment_path): os.remove(segment_path) process.send_signal(signal.SIGINT) raise TerminalException("Extracting with start and end from {} interrupted".format(segment_path)) finally: process.kill() os.system("stty sane")
[docs] def get_audio_segment_starts_and_ends(self, subs: List[SubRipItem]) -> Tuple[List[str], List[str], List[SubRipFile]]: """Group subtitle cues into larger segments in terms of silence gaps. Arguments: subs {list} -- A list of SupRip cues. Returns: tuple -- A list of start times, a list of end times and a list of grouped SubRip files. """ local_subs = self.__preprocess_subs(subs) segment_starts = [] segment_ends = [] combined = [] new_subs = [] current_start = str(local_subs[0].start) for i in range(len(local_subs)): if i == len(local_subs) - 1: combined.append(local_subs[i]) segment_starts.append(current_start) segment_ends.append(str(local_subs[i].end)) new_subs.append(SubRipFile(combined)) del combined[:] else: # Do not segment when the subtitle is too short duration = FeatureEmbedder.time_to_sec( local_subs[i].end ) - FeatureEmbedder.time_to_sec(local_subs[i].start) if duration < self.__MIN_SECS_PER_WORD: combined.append(local_subs[i]) continue # Do not segment consecutive subtitles having little or no gap. gap = FeatureEmbedder.time_to_sec( local_subs[i + 1].start ) - FeatureEmbedder.time_to_sec(local_subs[i].end) if ( local_subs[i].end == local_subs[i + 1].start or gap < self.__MIN_GAP_IN_SECS ): combined.append(local_subs[i]) continue combined.append(local_subs[i]) # The start time is set to last cue's end time segment_starts.append(current_start) # The end time cannot be set to next cue's start time due to possible overlay segment_ends.append(str(local_subs[i].end)) current_start = str(local_subs[i].end) new_subs.append(SubRipFile(combined)) del combined[:] return segment_starts, segment_ends, new_subs
[docs] def get_frame_rate(self, file_path: str) -> float: """Extract the video frame rate. Will return 25 when input is audio Arguments: file_path {string} -- The input audiovisual file path. Returns: float -- The frame rate """ discarded = "NUL:" if os.name == "nt" else "/dev/null" with subprocess.Popen( shlex.split("{0} -i {1} -t 00:00:10 -f null {2}".format(self.FFMPEG_BIN, Utils.double_quoted(file_path), discarded)), shell=False, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True, bufsize=1, ) as proc: with subprocess.Popen( ['grep', '-Eo', r"[0-9]{1,3}(\.[0-9]{1,3})?\sfps,"], shell=False, stdin=proc.stderr, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True, bufsize=1, ) as process: try: std_out, std_err = process.communicate(timeout=self.__CMD_TIME_OUT) if process.returncode != 0: self.__LOGGER.warning("[{}] Cannot extract the frame rate from video: {}\n{}".format(process.pid, file_path, std_err)) raise NoFrameRateException( "Cannot extract the frame rate from video: {}".format(file_path) ) fps = float(std_out.split(" ")[0]) # ffmpeg uses two decimal places so be this hack fps = fps if fps != 23.98 else 23.976 self.__LOGGER.info("[{}] Extracted frame rate: {} fps".format(process.pid, fps)) return fps except subprocess.TimeoutExpired as te: raise NoFrameRateException( "Timeout on extracting the frame rate from video: {}".format(file_path) ) from te except Exception as e: if isinstance(e, TerminalException): raise e else: raise NoFrameRateException( "Cannot extract the frame rate from video: {}".format(file_path) ) from e except KeyboardInterrupt: self.__LOGGER.error( "[{}] Extracting frame rate from video {} interrupted".format( process.pid, file_path ) ) process.send_signal(signal.SIGINT) proc.send_signal(signal.SIGINT) raise TerminalException("Extracting frame rate from video {} interrupted".format(file_path)) finally: process.kill() proc.kill() os.system("stty sane")
[docs] def refragment_with_min_duration(self, subs: List[SubRipItem], minimum_segment_duration: float) -> List[SubRipItem]: """Re-fragment a list of subtitle cues into new cues each of spans a minimum duration Arguments: subs {list} -- A list of SupRip cues. minimum_segment_duration {float} -- The minimum duration in seconds for each output subtitle cue. Returns: list -- A list of new SupRip cues after fragmentation. """ new_segment = [] new_segment_index = 0 new_segment_duration = 0.0 new_segment_text = "" new_subs = [] for sub in subs: if minimum_segment_duration > new_segment_duration: new_segment.append(sub) new_segment_duration += self.get_duration_in_seconds(str(sub.start), str(sub.end)) or 0.0 new_segment_text += "{}\n".format(sub.text) else: concatenated_item = SubRipItem(new_segment_index, new_segment[0].start, new_segment[-1].end, new_segment_text, new_segment[0].position) new_subs.append(concatenated_item) new_segment_index += 1 new_segment = [sub] new_segment_duration = self.get_duration_in_seconds(str(sub.start), str(sub.end)) or 0.0 new_segment_text = "{}\n".format(sub.text) if new_segment: concatenated_item = SubRipItem(new_segment_index, new_segment[0].start, new_segment[-1].end, new_segment_text, new_segment[0].position) new_subs.append(concatenated_item) return new_subs
def __preprocess_subs(self, subs: List[SubRipItem]) -> List[SubRipItem]: local_subs = deepcopy(subs) # Preprocess overlapping subtitles for i in range(len(local_subs)): if i != 0 and local_subs[i].start < local_subs[i - 1].end: self.__LOGGER.warning("Found overlapping subtitle cues and the earlier one's duration will be shortened.") local_subs[i - 1].end = local_subs[i].start return local_subs