From b019671f124371129ada790029d485ed75c627ed Mon Sep 17 00:00:00 2001 From: Jaikinator Date: Fri, 9 Jun 2023 18:00:29 +0200 Subject: [PATCH] added files for rework --- autotranscript/__init__.py | 5 +- autotranscript/audio_processor.py | 93 +++++++++++++++++++ autotranscript/diarisation.py | 144 ++++++++++++++++++++++++++++++ 3 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 autotranscript/audio_processor.py create mode 100644 autotranscript/diarisation.py diff --git a/autotranscript/__init__.py b/autotranscript/__init__.py index 13f245b..91c8659 100644 --- a/autotranscript/__init__.py +++ b/autotranscript/__init__.py @@ -1,4 +1,7 @@ from autotranscript.__main__ import * +from autotranscript.transcriptor import * +from autotranscript.audio_processor import * from autotranscript.version import get_version as _get_version +from autotranscript.misc import * -__version__ = _get_version() \ No newline at end of file +__version__ = _get_version() diff --git a/autotranscript/audio_processor.py b/autotranscript/audio_processor.py new file mode 100644 index 0000000..2b8eee8 --- /dev/null +++ b/autotranscript/audio_processor.py @@ -0,0 +1,93 @@ +from typing import Union +from pydub import AudioSegment +import os + +class AudioProcessor: + def __init__(self, audio_file:str): + self.audio_file_path = audio_file + self.audio_file = AudioSegment.from_file(audio_file, format=audio_file.split('.')[-1]) + + self.audiofilename = audio_file.split('/')[-1][:-4] + self.coreaudiofile = audio_file.split('/')[-1][:-4] + self.audiofilefolder = os.path.dirname(audio_file) + self.audio_file_type = audio_file.split('.')[-1] + + + + def convert_audio(self, savefolder: str = "", savename: str = "", type: str = "wav", remove_orginal: bool = True): + """ + Convert video file or other audio files to mp3 file, ensures that the audio file is in the correct format for the + Whisper model + :param file: path to audio or video file + :param remove_orginal: remove original file + :return: mp3 file path + """ + print(f'Converting {self.audiofilename} to .{type} file') + + if savefolder == "": + savefolder = self.audiofilefolder + + if savename == "": + savename = self.coreaudiofile + f'.{type}' + else: + savename = savename + f'.{type}' + + savepath = os.path.join(savefolder, savename) + + self.audio_file.export(savepath, format=type) + + print(f'Converted {self.audiofilename} to {type}') + + if remove_orginal: + os.remove(self.audio_file_path) + print(f'File {self.audio_file_path} removed') + + self.audio_file_path = savepath + self.audio_file = AudioSegment.from_file(savepath, format=type) + + return self + + def to_mp3(self, savefolder: str = "", savename: str = "", remove_orginal: bool = True): + """ + Convert audio file to mp3 file + :param file: audio file + :param remove_orginal: remove original file + :return: mp3 file path + """ + return self.convert_audio(savefolder = savefolder, savename = savename, type="mp3", remove_orginal=remove_orginal) + + def to_wav(self, savefolder: str = "", savename: str = "", remove_orginal: bool = True): + """ + Convert audio file to wav file + :param file: audio file + :param remove_orginal: remove original file + :return: wav file path + """ + return self.convert_audio(savefolder = savefolder, savename = savename,type="wav", remove_orginal=remove_orginal) + + def slower_mp3(self, savefolder: str = "", savename: str = "", speed: float = 0.75, type: str = "mp3"): + """ + Slow down mp3 file + :param file: mp3 file + :param speed: speed + :return: None + """ + if savefolder == "": + savefolder = self.audiofilefolder + else: + savefolder = savefolder + + sound = self.audio_file + slow_sound = sound._spawn(sound.raw_data, overrides={ + "frame_rate": int(sound.frame_rate * speed) + }) + + speedstr = str(speed).replace('.', '') + + file_out = self.coreaudiofile + f'_{speedstr}.{type}' + + save_path = os.path.join(savefolder, file_out) + + slow_sound.export(save_path, format=type) + + return slow_sound \ No newline at end of file diff --git a/autotranscript/diarisation.py b/autotranscript/diarisation.py new file mode 100644 index 0000000..b7ee848 --- /dev/null +++ b/autotranscript/diarisation.py @@ -0,0 +1,144 @@ +from audio_processor import AudioProcessor +from time import time +import os + +class Diarisation(AudioProcessor): + def __init__(self, audio_file: str, model,**kwargs) -> None: + + super().__init__(audio_file=audio_file) + + self.model = model + + + def diarization(self, *args, **kwargs): + + if "num_speakers" in kwargs: + num_speakers = kwargs['num_speakers'] + kwargs.pop('num_speakers') + else: + num_speakers = 2 + + audiofilename = self.coreaudiofile + + print(f'Start diarization of audio file: {self.audiofilename}') + + _stime = time() + + diarization = self.model(self.audio_file_path, num_speakers=num_speakers) + + print(f'Diarization finished in {time() - _stime} seconds') + self.diarization = diarization + + return diarization + + def format_diarization_output(self, *args, **kwargs): + """ + Format diarization output to a list of tuples + :param args: + :param kwargs: + :return: dict with speaker names as keys and list of tuples as values and list of different speakers + """ + + diarization_output = {"speakers": [], "segments": []} + + if not hasattr(self, 'diarization'): + # ensure diarization is run before formatting + self.diarization = self.diarization() + + + for segment, _, speaker in self.diarization.itertracks(yield_label=True): + diarization_output["speakers"].append(speaker) + diarization_output["segments"].append(segment) + + normalized_output = [] + index_start_speaker = 0 + index_end_speaker = 0 + current_speaker = str() + + for i, speaker in enumerate(diarization_output["speakers"]): + + if i == 0: + current_speaker = speaker + + if speaker != current_speaker: + + index_end_speaker = i - 1 + + normalized_output.append([index_start_speaker, index_end_speaker, current_speaker]) + + index_start_speaker = i + current_speaker = speaker + + if i == len(diarization_output["speakers"]) - 1: + + index_end_speaker = i + normalized_output.append([index_start_speaker, index_end_speaker, current_speaker]) + + + self.normalized_output = normalized_output + self.diarization_output = diarization_output + + return diarization_output,normalized_output + + def create_temporary_wav(self,savefolder: str = "", savename: str = "", *args, **kwargs): + """ + Create temporary wav file for diarization + :param savefolder: folder to save the temporary wav file + :param savename: name of the temporary wav file prefix + :param audiofile: audio file + :return: temporary wav file + """ + + + if savefolder == "": + folder = '.temp' + if not os.path.exists(folder): + os.makedirs(folder) + else: + folder = savefolder + + folder = os.path.realpath(folder) + + if savename == "": + savename = self.coreaudiofile + '.wav' + else: + savename = savename + + + if not os.path.exists(folder): + os.makedirs(folder) + + if not hasattr(self, 'normalized_output') or not hasattr(self, 'diarization_output'): + self.format_diarization_output() + + + speaker = set(self.diarization_output["speakers"]) + num_speak_iter = [0 for _ in range(len(speaker))] + + for count, outp in enumerate(self.normalized_output): + start = self.diarization_output["segments"][outp[0]].start + end = self.diarization_output["segments"][outp[1]].end + + print("start: ", start) + print("end: ", end) + + start_milliseconds = start * 1000 + end_milliseconds = end * 1000 + + print("start_milliseconds: ", start_milliseconds) + print("end_milliseconds: ", end_milliseconds) + + print("cut audio") + + cut_audio = self.audio_file[start_milliseconds:end_milliseconds] + + print("save audio") + print(f".temp/{count}_speaker_" + str(outp[2]) + ".wav") + cut_audio.export(f".temp/{count}_speaker_" + str(outp[2]) + ".wav", format="wav") + + return os.path.realpath(folder) + + def __repr__(self): + return f"Diarization(audiofile={self.audiofile}, model={self.model}, language={self.language})" + def __str__(self): + return f"Diarization(audiofile={self.audiofile}, model={self.model}, language={self.language})" \ No newline at end of file