Package media
Sub-modules
media.audio
media.exception
media.lib
media.picture
media.video
Classes
class Audio (stream, mode='w')
-
Expand source code
class Audio(): def __init__(self, stream, mode="w"): self.ffmpeg = stream self.mode=mode self.played=False self.stopwatch=self.ffmpeg.stopwatch def play(self, channels=2, device=None): if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.") if self.played: self.sdStream.Stop() info=self.ffmpeg.info["streams"]["audio"][self.ffmpeg.loader["audio"]] self.channels=channels self.device=device streamOptions={"frames_per_buffer":info["frame_size"]} if self.mode == "rw": if type(self.device) in [list, tuple]: streamOptions.update(input_device_index=self.device[0], output_device_index=self.device[1]) else: streamOptions.update(input_device_index=self.device, output_device_index=self.device) elif self.mode == "r": streamOptions.update(input_device_index=self.device) else: streamOptions.update(output_device_index=self.device) self.streamOptions=streamOptions self.sdStream=Sounddevice(mode=self.mode, dataQueue=self.ffmpeg._audioQ, stopwatch=self.stopwatch, streamOptions={"samplerate":info["sample_rate"], "blocksize":info["frame_size"], "channels":self.channels, "device":self.device}) self.played=True self.sdStream.Play() def pause(self): if not self.played: raise WrongOrderError("Not played") self.sdStream.Pause() def resume(self): if not self.played: raise WrongOrderError("Not played") self.sdStream.fixDataQueue(self.ffmpeg._audioQ) self.sdStream.Resume() def stop(self): if not self.played: raise WrongOrderError("Not played") self.sdStream.Stop() def close(self): self.sdStream.Close()
Methods
def close(self)
-
Expand source code
def close(self): self.sdStream.Close()
def pause(self)
-
Expand source code
def pause(self): if not self.played: raise WrongOrderError("Not played") self.sdStream.Pause()
def play(self, channels=2, device=None)
-
Expand source code
def play(self, channels=2, device=None): if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.") if self.played: self.sdStream.Stop() info=self.ffmpeg.info["streams"]["audio"][self.ffmpeg.loader["audio"]] self.channels=channels self.device=device streamOptions={"frames_per_buffer":info["frame_size"]} if self.mode == "rw": if type(self.device) in [list, tuple]: streamOptions.update(input_device_index=self.device[0], output_device_index=self.device[1]) else: streamOptions.update(input_device_index=self.device, output_device_index=self.device) elif self.mode == "r": streamOptions.update(input_device_index=self.device) else: streamOptions.update(output_device_index=self.device) self.streamOptions=streamOptions self.sdStream=Sounddevice(mode=self.mode, dataQueue=self.ffmpeg._audioQ, stopwatch=self.stopwatch, streamOptions={"samplerate":info["sample_rate"], "blocksize":info["frame_size"], "channels":self.channels, "device":self.device}) self.played=True self.sdStream.Play()
def resume(self)
-
Expand source code
def resume(self): if not self.played: raise WrongOrderError("Not played") self.sdStream.fixDataQueue(self.ffmpeg._audioQ) self.sdStream.Resume()
def stop(self)
-
Expand source code
def stop(self): if not self.played: raise WrongOrderError("Not played") self.sdStream.Stop()
class AudioFilter (stream=None, width=None, height=None, format=None, name=None)
-
Expand source code
class AudioFilter(): def __init__(self, stream=None, width=None, height=None, format=None, name=None): self.filter=Graph() self.src=self.filter.add_abuffer(template=stream, width=width, height=height, format=format, name=name) def addFilter(self, filter, arg): f = self.filter.add(filter=filter, args=arg) self.src.link_to(f) f.link_to(self.filter.add("buffersink"))#thinking... self.filter.configure() def Process(self, frame): self.filter.push(frame=frame) return self.filter.pull() def getFilter(self): pass def availableFilters(self): return filters_available
Methods
def Process(self, frame)
-
Expand source code
def Process(self, frame): self.filter.push(frame=frame) return self.filter.pull()
def addFilter(self, filter, arg)
-
Expand source code
def addFilter(self, filter, arg): f = self.filter.add(filter=filter, args=arg) self.src.link_to(f) f.link_to(self.filter.add("buffersink"))#thinking... self.filter.configure()
def availableFilters(self)
-
Expand source code
def availableFilters(self): return filters_available
def getFilter(self)
-
Expand source code
def getFilter(self): pass
class Media (url: str, mode: Literal['r', 'w'] = 'r', **options)
-
Expand source code
class Media(): def __init__(self, url:str, mode:Literal["r", "w"]="r", **options): """ Initialize media library. url(str or file-like object): Media file url. This will pass to ffmpeg. mode(str): Opening mode. You can set to 'r' or 'w'. **options: Options for PyAV. """ self.ffmpeg=Stream(url, mode, **options) self.info=self.ffmpeg.info self.ffmpegs=self.ffmpeg.streams self.stopwatch=self.ffmpeg.stopwatch self.status="pause" def play(self, audioDevice:Optional[int]=None, videoFrame:Optional[int]=None, video:Optional[int]=None, audio:Optional[int]=None, frame_type:Literal["label", "canvas"]="label"): """ Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number """ self.ffmpeg.load(audio=audio, video=video, block=True) if not video is None: self.video=Video(self.ffmpeg, mode="w") if not audio is None: self.audio=Audio(self.ffmpeg, mode="w") if not audio is None: self.audio.play(device=audioDevice) if not video is None: self.video.play(frame=videoFrame, frame_type=frame_type) self.play_options={"audioDevice":audioDevice, "videoFrame":videoFrame, "video":video, "audio":audio, "frame_type":frame_type} self.status="play" def pause(self): """ Pause playing. """ if self.status != "play": raise exception.WrongOrderError("Media is not playing.") self.ffmpeg.pause() try: self.video.pause() except: pass try: self.audio.pause() except: pass self.status="pause" def resume(self): """ Resume playing. """ if self.status != "pause": raise exception.WrongOrderError("Media is not paused.") self.ffmpeg.resume() try: self.video.resume() except: pass try: self.audio.resume() except: pass self.status="play" def seek(self, point:int): """ Seek media. point(int): time/s to seek """ self.Pause() self.ffmpeg.seek(point) self.Resume() self.status="play" def stop(self): """ Stop media. """ self.ffmpeg.stop() try: self.video.stop() pass except: pass try: self.audio.stop() except: pass self.status="stop" def close(self): """ Close media. """ if self.status !="stop": self.Stop() try: self.audio.close() except: pass try: self.video.close() except: pass self.ffmpeg.close() self.status="close" Play = play Pause = pause Resume = resume Seek = seek Stop = stop Close = close
Initialize media library. url(str or file-like object): Media file url. This will pass to ffmpeg. mode(str): Opening mode. You can set to 'r' or 'w'. **options: Options for PyAV.
Methods
def Close(self)
-
Expand source code
def close(self): """ Close media. """ if self.status !="stop": self.Stop() try: self.audio.close() except: pass try: self.video.close() except: pass self.ffmpeg.close() self.status="close"
Close media.
def Pause(self)
-
Expand source code
def pause(self): """ Pause playing. """ if self.status != "play": raise exception.WrongOrderError("Media is not playing.") self.ffmpeg.pause() try: self.video.pause() except: pass try: self.audio.pause() except: pass self.status="pause"
Pause playing.
def Play(self,
audioDevice: int | None = None,
videoFrame: int | None = None,
video: int | None = None,
audio: int | None = None,
frame_type: Literal['label', 'canvas'] = 'label')-
Expand source code
def play(self, audioDevice:Optional[int]=None, videoFrame:Optional[int]=None, video:Optional[int]=None, audio:Optional[int]=None, frame_type:Literal["label", "canvas"]="label"): """ Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number """ self.ffmpeg.load(audio=audio, video=video, block=True) if not video is None: self.video=Video(self.ffmpeg, mode="w") if not audio is None: self.audio=Audio(self.ffmpeg, mode="w") if not audio is None: self.audio.play(device=audioDevice) if not video is None: self.video.play(frame=videoFrame, frame_type=frame_type) self.play_options={"audioDevice":audioDevice, "videoFrame":videoFrame, "video":video, "audio":audio, "frame_type":frame_type} self.status="play"
Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number
def Resume(self)
-
Expand source code
def resume(self): """ Resume playing. """ if self.status != "pause": raise exception.WrongOrderError("Media is not paused.") self.ffmpeg.resume() try: self.video.resume() except: pass try: self.audio.resume() except: pass self.status="play"
Resume playing.
def Seek(self, point: int)
-
Expand source code
def seek(self, point:int): """ Seek media. point(int): time/s to seek """ self.Pause() self.ffmpeg.seek(point) self.Resume() self.status="play"
Seek media. point(int): time/s to seek
def Stop(self)
-
Expand source code
def stop(self): """ Stop media. """ self.ffmpeg.stop() try: self.video.stop() pass except: pass try: self.audio.stop() except: pass self.status="stop"
Stop media.
def close(self)
-
Expand source code
def close(self): """ Close media. """ if self.status !="stop": self.Stop() try: self.audio.close() except: pass try: self.video.close() except: pass self.ffmpeg.close() self.status="close"
Close media.
def pause(self)
-
Expand source code
def pause(self): """ Pause playing. """ if self.status != "play": raise exception.WrongOrderError("Media is not playing.") self.ffmpeg.pause() try: self.video.pause() except: pass try: self.audio.pause() except: pass self.status="pause"
Pause playing.
def play(self,
audioDevice: int | None = None,
videoFrame: int | None = None,
video: int | None = None,
audio: int | None = None,
frame_type: Literal['label', 'canvas'] = 'label')-
Expand source code
def play(self, audioDevice:Optional[int]=None, videoFrame:Optional[int]=None, video:Optional[int]=None, audio:Optional[int]=None, frame_type:Literal["label", "canvas"]="label"): """ Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number """ self.ffmpeg.load(audio=audio, video=video, block=True) if not video is None: self.video=Video(self.ffmpeg, mode="w") if not audio is None: self.audio=Audio(self.ffmpeg, mode="w") if not audio is None: self.audio.play(device=audioDevice) if not video is None: self.video.play(frame=videoFrame, frame_type=frame_type) self.play_options={"audioDevice":audioDevice, "videoFrame":videoFrame, "video":video, "audio":audio, "frame_type":frame_type} self.status="play"
Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number
def resume(self)
-
Expand source code
def resume(self): """ Resume playing. """ if self.status != "pause": raise exception.WrongOrderError("Media is not paused.") self.ffmpeg.resume() try: self.video.resume() except: pass try: self.audio.resume() except: pass self.status="play"
Resume playing.
def seek(self, point: int)
-
Expand source code
def seek(self, point:int): """ Seek media. point(int): time/s to seek """ self.Pause() self.ffmpeg.seek(point) self.Resume() self.status="play"
Seek media. point(int): time/s to seek
def stop(self)
-
Expand source code
def stop(self): """ Stop media. """ self.ffmpeg.stop() try: self.video.stop() pass except: pass try: self.audio.stop() except: pass self.status="stop"
Stop media.
class Picture
-
Expand source code
class Picture(): pass
class Video (stream, mode='w')
-
Expand source code
class Video(): def __init__(self, stream, mode="w"): self.ffmpeg = stream self.mode=mode self.played=False self.stopwatch=self.ffmpeg.stopwatch self.state="stop" self.old_frame=None self.old_frame2=None self.tkimage=None def play(self, frame, frame_type): if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.") info=self.ffmpeg.info["streams"]["video"][self.ffmpeg.loader["video"]] self.frame=frame self.frame_type=frame_type self.state="play" self.frame.after(info["start_time"], self._play) self.played=True def _play(self): if self.state=="play": try: frame=self.ffmpeg._videoQ.get_nowait() except Empty: self.frame.after(10, self._play) else: info=self.ffmpeg.info["streams"]["video"][self.ffmpeg.loader["video"]] sleep_time=1/info["fps"] gap=self.stopwatch.getTime()-frame[0] border=0.5 if gap<float(0): self.frame.after(0, self._play) elif gap>float(border): self.frame.after(int(gap), self._play) elif gap<=float(border): if not self.old_frame is None: self.old_frame.close() self.old_frame2=self.tkimage self.tkimage=ImageTk.PhotoImage(frame[1], master=self.frame) if self.frame_type=="label": self.frame.configure(image=self.tkimage) self.old_frame=frame[1] self.frame.after(int(sleep_time*1000), self._play) elif self.state == "pause": pass elif self.state == "stop": pass def pause(self): self.state = "pause" def resume(self): self.state = "play" self.frame.after(0, self._play) def stop(self): self.state="stop" def close(self): self.state="stop"
Methods
def close(self)
-
Expand source code
def close(self): self.state="stop"
def pause(self)
-
Expand source code
def pause(self): self.state = "pause"
def play(self, frame, frame_type)
-
Expand source code
def play(self, frame, frame_type): if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.") info=self.ffmpeg.info["streams"]["video"][self.ffmpeg.loader["video"]] self.frame=frame self.frame_type=frame_type self.state="play" self.frame.after(info["start_time"], self._play) self.played=True
def resume(self)
-
Expand source code
def resume(self): self.state = "play" self.frame.after(0, self._play)
def stop(self)
-
Expand source code
def stop(self): self.state="stop"
class VideoFilter (stream=None, width=None, height=None, format=None, name=None)
-
Expand source code
class VideoFilter(): def __init__(self, stream=None, width=None, height=None, format=None, name=None): self.filter=Graph() self.src=self.filter.add_buffer(template=stream, width=width, height=height, format=format, name=name) def addFilter(self, filter, arg): f = self.filter.add(filter=filter, args=arg) self.src.link_to(f) f.link_to(self.filter.add("buffersink"))#thinking... self.filter.configure() def Process(self, frame): self.filter.push(frame=frame) return self.filter.pull() def getFilter(self): pass def availableFilters(self): return filters_available
Methods
def Process(self, frame)
-
Expand source code
def Process(self, frame): self.filter.push(frame=frame) return self.filter.pull()
def addFilter(self, filter, arg)
-
Expand source code
def addFilter(self, filter, arg): f = self.filter.add(filter=filter, args=arg) self.src.link_to(f) f.link_to(self.filter.add("buffersink"))#thinking... self.filter.configure()
def availableFilters(self)
-
Expand source code
def availableFilters(self): return filters_available
def getFilter(self)
-
Expand source code
def getFilter(self): pass