Package media

Sub-modules

media.audio
media.exception
media.lib
media.picture
media.video

Classes

class Audio (stream, mode='w')
Expand source code
class Audio():
    def __init__(self, stream, mode="w"):
        self.ffmpeg = stream
        self.mode=mode
        self.played=False
        self.stopwatch=self.ffmpeg.stopwatch
    def play(self, channels=2, device=None):
        if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.")
        if self.played:
            self.sdStream.Stop()
        info=self.ffmpeg.info["streams"]["audio"][self.ffmpeg.loader["audio"]]
        self.channels=channels
        self.device=device
        streamOptions={"frames_per_buffer":info["frame_size"]}
        if self.mode == "rw":
            if type(self.device) in [list, tuple]:
                streamOptions.update(input_device_index=self.device[0], output_device_index=self.device[1])
            else:
                streamOptions.update(input_device_index=self.device, output_device_index=self.device)
        elif self.mode == "r":
            streamOptions.update(input_device_index=self.device)
        else:
            streamOptions.update(output_device_index=self.device)
        self.streamOptions=streamOptions
        self.sdStream=Sounddevice(mode=self.mode, dataQueue=self.ffmpeg._audioQ, stopwatch=self.stopwatch, streamOptions={"samplerate":info["sample_rate"], "blocksize":info["frame_size"], "channels":self.channels, "device":self.device})
        self.played=True
        self.sdStream.Play()
    def pause(self):
        if not self.played: raise WrongOrderError("Not played")
        self.sdStream.Pause()
    def resume(self):
        if not self.played: raise WrongOrderError("Not played")
        self.sdStream.fixDataQueue(self.ffmpeg._audioQ)
        self.sdStream.Resume()
    def stop(self):
        if not self.played: raise WrongOrderError("Not played")
        self.sdStream.Stop()
    def close(self):
        self.sdStream.Close()

Methods

def close(self)
Expand source code
def close(self):
    self.sdStream.Close()
def pause(self)
Expand source code
def pause(self):
    if not self.played: raise WrongOrderError("Not played")
    self.sdStream.Pause()
def play(self, channels=2, device=None)
Expand source code
def play(self, channels=2, device=None):
    if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.")
    if self.played:
        self.sdStream.Stop()
    info=self.ffmpeg.info["streams"]["audio"][self.ffmpeg.loader["audio"]]
    self.channels=channels
    self.device=device
    streamOptions={"frames_per_buffer":info["frame_size"]}
    if self.mode == "rw":
        if type(self.device) in [list, tuple]:
            streamOptions.update(input_device_index=self.device[0], output_device_index=self.device[1])
        else:
            streamOptions.update(input_device_index=self.device, output_device_index=self.device)
    elif self.mode == "r":
        streamOptions.update(input_device_index=self.device)
    else:
        streamOptions.update(output_device_index=self.device)
    self.streamOptions=streamOptions
    self.sdStream=Sounddevice(mode=self.mode, dataQueue=self.ffmpeg._audioQ, stopwatch=self.stopwatch, streamOptions={"samplerate":info["sample_rate"], "blocksize":info["frame_size"], "channels":self.channels, "device":self.device})
    self.played=True
    self.sdStream.Play()
def resume(self)
Expand source code
def resume(self):
    if not self.played: raise WrongOrderError("Not played")
    self.sdStream.fixDataQueue(self.ffmpeg._audioQ)
    self.sdStream.Resume()
def stop(self)
Expand source code
def stop(self):
    if not self.played: raise WrongOrderError("Not played")
    self.sdStream.Stop()
class AudioFilter (stream=None, width=None, height=None, format=None, name=None)
Expand source code
class AudioFilter():
    def __init__(self, stream=None, width=None, height=None, format=None, name=None):
        self.filter=Graph()
        self.src=self.filter.add_abuffer(template=stream, width=width, height=height, format=format, name=name)
    def addFilter(self, filter, arg):
        f = self.filter.add(filter=filter, args=arg)
        self.src.link_to(f)
        f.link_to(self.filter.add("buffersink"))#thinking...
        self.filter.configure()
    def Process(self, frame):
        self.filter.push(frame=frame)
        return self.filter.pull()
    def getFilter(self):
        pass
    def availableFilters(self):
        return filters_available

Methods

def Process(self, frame)
Expand source code
def Process(self, frame):
    self.filter.push(frame=frame)
    return self.filter.pull()
def addFilter(self, filter, arg)
Expand source code
def addFilter(self, filter, arg):
    f = self.filter.add(filter=filter, args=arg)
    self.src.link_to(f)
    f.link_to(self.filter.add("buffersink"))#thinking...
    self.filter.configure()
def availableFilters(self)
Expand source code
def availableFilters(self):
    return filters_available
def getFilter(self)
Expand source code
def getFilter(self):
    pass
class Media (url: str, mode: Literal['r', 'w'] = 'r', **options)
Expand source code
class Media():
    def __init__(self, url:str, mode:Literal["r", "w"]="r", **options):
        """
        Initialize media library.
        url(str or file-like object): Media file url. This will pass to ffmpeg.
        mode(str): Opening mode. You can set to 'r' or 'w'.
        **options: Options for PyAV.
        """
        self.ffmpeg=Stream(url, mode, **options)
        self.info=self.ffmpeg.info
        self.ffmpegs=self.ffmpeg.streams
        self.stopwatch=self.ffmpeg.stopwatch
        self.status="pause"
    def play(self, audioDevice:Optional[int]=None, videoFrame:Optional[int]=None, video:Optional[int]=None,
             audio:Optional[int]=None, frame_type:Literal["label", "canvas"]="label"):
        """
        Play media.
        audioDevice(int): An number of Device(You can get with media.audio.getDevices func)
        videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video
        frame_type(str): videoFrame type(Label->label, Canvas->canvas)
        video(int): video stream number
        audio(int): audio stream number
        """
        self.ffmpeg.load(audio=audio, video=video, block=True)
        if not video is None:
            self.video=Video(self.ffmpeg, mode="w")
        if not audio is None:
            self.audio=Audio(self.ffmpeg, mode="w")
        if not audio is None:
            self.audio.play(device=audioDevice)
        if not video is None:
            self.video.play(frame=videoFrame, frame_type=frame_type)
        self.play_options={"audioDevice":audioDevice, "videoFrame":videoFrame, "video":video, "audio":audio, "frame_type":frame_type}
        self.status="play"
    def pause(self):
        """
        Pause playing.
        """
        if self.status != "play": raise exception.WrongOrderError("Media is not playing.")
        self.ffmpeg.pause()
        try:
            self.video.pause()
        except: pass
        try:
            self.audio.pause()
        except: pass
        self.status="pause"
    def resume(self):
        """
        Resume playing.
        """
        if self.status != "pause": raise exception.WrongOrderError("Media is not paused.")
        self.ffmpeg.resume()
        try:
            self.video.resume()
        except: pass
        try:
            self.audio.resume()
        except: pass
        self.status="play"
    def seek(self, point:int):
        """
        Seek media.
        point(int): time/s to seek
        """
        self.Pause()
        self.ffmpeg.seek(point)
        self.Resume()
        self.status="play"
    def stop(self):
        """
        Stop media.
        """
        self.ffmpeg.stop()
        try:
            self.video.stop()
            pass
        except: pass
        try:
            self.audio.stop()
        except: pass
        self.status="stop"
    def close(self):
        """
        Close media.
        """
        if self.status !="stop": self.Stop()
        try:
            self.audio.close()
        except: pass
        try:
            self.video.close()
        except: pass
        self.ffmpeg.close()
        self.status="close"
    Play = play
    Pause = pause
    Resume = resume
    Seek = seek
    Stop = stop
    Close = close

Initialize media library. url(str or file-like object): Media file url. This will pass to ffmpeg. mode(str): Opening mode. You can set to 'r' or 'w'. **options: Options for PyAV.

Methods

def Close(self)
Expand source code
def close(self):
    """
    Close media.
    """
    if self.status !="stop": self.Stop()
    try:
        self.audio.close()
    except: pass
    try:
        self.video.close()
    except: pass
    self.ffmpeg.close()
    self.status="close"

Close media.

def Pause(self)
Expand source code
def pause(self):
    """
    Pause playing.
    """
    if self.status != "play": raise exception.WrongOrderError("Media is not playing.")
    self.ffmpeg.pause()
    try:
        self.video.pause()
    except: pass
    try:
        self.audio.pause()
    except: pass
    self.status="pause"

Pause playing.

def Play(self,
audioDevice: int | None = None,
videoFrame: int | None = None,
video: int | None = None,
audio: int | None = None,
frame_type: Literal['label', 'canvas'] = 'label')
Expand source code
def play(self, audioDevice:Optional[int]=None, videoFrame:Optional[int]=None, video:Optional[int]=None,
         audio:Optional[int]=None, frame_type:Literal["label", "canvas"]="label"):
    """
    Play media.
    audioDevice(int): An number of Device(You can get with media.audio.getDevices func)
    videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video
    frame_type(str): videoFrame type(Label->label, Canvas->canvas)
    video(int): video stream number
    audio(int): audio stream number
    """
    self.ffmpeg.load(audio=audio, video=video, block=True)
    if not video is None:
        self.video=Video(self.ffmpeg, mode="w")
    if not audio is None:
        self.audio=Audio(self.ffmpeg, mode="w")
    if not audio is None:
        self.audio.play(device=audioDevice)
    if not video is None:
        self.video.play(frame=videoFrame, frame_type=frame_type)
    self.play_options={"audioDevice":audioDevice, "videoFrame":videoFrame, "video":video, "audio":audio, "frame_type":frame_type}
    self.status="play"

Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number

def Resume(self)
Expand source code
def resume(self):
    """
    Resume playing.
    """
    if self.status != "pause": raise exception.WrongOrderError("Media is not paused.")
    self.ffmpeg.resume()
    try:
        self.video.resume()
    except: pass
    try:
        self.audio.resume()
    except: pass
    self.status="play"

Resume playing.

def Seek(self, point: int)
Expand source code
def seek(self, point:int):
    """
    Seek media.
    point(int): time/s to seek
    """
    self.Pause()
    self.ffmpeg.seek(point)
    self.Resume()
    self.status="play"

Seek media. point(int): time/s to seek

def Stop(self)
Expand source code
def stop(self):
    """
    Stop media.
    """
    self.ffmpeg.stop()
    try:
        self.video.stop()
        pass
    except: pass
    try:
        self.audio.stop()
    except: pass
    self.status="stop"

Stop media.

def close(self)
Expand source code
def close(self):
    """
    Close media.
    """
    if self.status !="stop": self.Stop()
    try:
        self.audio.close()
    except: pass
    try:
        self.video.close()
    except: pass
    self.ffmpeg.close()
    self.status="close"

Close media.

def pause(self)
Expand source code
def pause(self):
    """
    Pause playing.
    """
    if self.status != "play": raise exception.WrongOrderError("Media is not playing.")
    self.ffmpeg.pause()
    try:
        self.video.pause()
    except: pass
    try:
        self.audio.pause()
    except: pass
    self.status="pause"

Pause playing.

def play(self,
audioDevice: int | None = None,
videoFrame: int | None = None,
video: int | None = None,
audio: int | None = None,
frame_type: Literal['label', 'canvas'] = 'label')
Expand source code
def play(self, audioDevice:Optional[int]=None, videoFrame:Optional[int]=None, video:Optional[int]=None,
         audio:Optional[int]=None, frame_type:Literal["label", "canvas"]="label"):
    """
    Play media.
    audioDevice(int): An number of Device(You can get with media.audio.getDevices func)
    videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video
    frame_type(str): videoFrame type(Label->label, Canvas->canvas)
    video(int): video stream number
    audio(int): audio stream number
    """
    self.ffmpeg.load(audio=audio, video=video, block=True)
    if not video is None:
        self.video=Video(self.ffmpeg, mode="w")
    if not audio is None:
        self.audio=Audio(self.ffmpeg, mode="w")
    if not audio is None:
        self.audio.play(device=audioDevice)
    if not video is None:
        self.video.play(frame=videoFrame, frame_type=frame_type)
    self.play_options={"audioDevice":audioDevice, "videoFrame":videoFrame, "video":video, "audio":audio, "frame_type":frame_type}
    self.status="play"

Play media. audioDevice(int): An number of Device(You can get with media.audio.getDevices func) videoFrame(tkinter.Label or tkinter.Canvas): An frame to playing video frame_type(str): videoFrame type(Label->label, Canvas->canvas) video(int): video stream number audio(int): audio stream number

def resume(self)
Expand source code
def resume(self):
    """
    Resume playing.
    """
    if self.status != "pause": raise exception.WrongOrderError("Media is not paused.")
    self.ffmpeg.resume()
    try:
        self.video.resume()
    except: pass
    try:
        self.audio.resume()
    except: pass
    self.status="play"

Resume playing.

def seek(self, point: int)
Expand source code
def seek(self, point:int):
    """
    Seek media.
    point(int): time/s to seek
    """
    self.Pause()
    self.ffmpeg.seek(point)
    self.Resume()
    self.status="play"

Seek media. point(int): time/s to seek

def stop(self)
Expand source code
def stop(self):
    """
    Stop media.
    """
    self.ffmpeg.stop()
    try:
        self.video.stop()
        pass
    except: pass
    try:
        self.audio.stop()
    except: pass
    self.status="stop"

Stop media.

class Picture
Expand source code
class Picture():
    pass
class Video (stream, mode='w')
Expand source code
class Video():
    def __init__(self, stream, mode="w"):
        self.ffmpeg = stream
        self.mode=mode
        self.played=False
        self.stopwatch=self.ffmpeg.stopwatch
        self.state="stop"
        self.old_frame=None
        self.old_frame2=None
        self.tkimage=None
    def play(self, frame, frame_type):
        if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.")
        info=self.ffmpeg.info["streams"]["video"][self.ffmpeg.loader["video"]]
        self.frame=frame
        self.frame_type=frame_type
        self.state="play"
        self.frame.after(info["start_time"], self._play)
        self.played=True
    def _play(self):
        if self.state=="play":
            try:
                frame=self.ffmpeg._videoQ.get_nowait()
            except Empty:
                self.frame.after(10, self._play)
            else:
                info=self.ffmpeg.info["streams"]["video"][self.ffmpeg.loader["video"]]
                sleep_time=1/info["fps"]
                gap=self.stopwatch.getTime()-frame[0]
                border=0.5
                if gap<float(0):
                    self.frame.after(0, self._play)
                elif gap>float(border):
                    self.frame.after(int(gap), self._play)
                elif gap<=float(border):
                    if not self.old_frame is None:
                        self.old_frame.close()
                        self.old_frame2=self.tkimage
                    self.tkimage=ImageTk.PhotoImage(frame[1], master=self.frame)
                    if self.frame_type=="label":
                        self.frame.configure(image=self.tkimage)
                    self.old_frame=frame[1]
                    self.frame.after(int(sleep_time*1000), self._play)
        elif self.state == "pause":
            pass
        elif self.state == "stop":
            pass
    def pause(self):
        self.state = "pause"
    def resume(self):
        self.state = "play"
        self.frame.after(0, self._play)
    def stop(self):
        self.state="stop"
    def close(self):
        self.state="stop"

Methods

def close(self)
Expand source code
def close(self):
    self.state="stop"
def pause(self)
Expand source code
def pause(self):
    self.state = "pause"
def play(self, frame, frame_type)
Expand source code
def play(self, frame, frame_type):
    if not self.ffmpeg.loader["loaded"]: raise WrongOrderError("Stream is not loaded.")
    info=self.ffmpeg.info["streams"]["video"][self.ffmpeg.loader["video"]]
    self.frame=frame
    self.frame_type=frame_type
    self.state="play"
    self.frame.after(info["start_time"], self._play)
    self.played=True
def resume(self)
Expand source code
def resume(self):
    self.state = "play"
    self.frame.after(0, self._play)
def stop(self)
Expand source code
def stop(self):
    self.state="stop"
class VideoFilter (stream=None, width=None, height=None, format=None, name=None)
Expand source code
class VideoFilter():
    def __init__(self, stream=None, width=None, height=None, format=None, name=None):
        self.filter=Graph()
        self.src=self.filter.add_buffer(template=stream, width=width, height=height, format=format, name=name)
    def addFilter(self, filter, arg):
        f = self.filter.add(filter=filter, args=arg)
        self.src.link_to(f)
        f.link_to(self.filter.add("buffersink"))#thinking...
        self.filter.configure()
    def Process(self, frame):
        self.filter.push(frame=frame)
        return self.filter.pull()
    def getFilter(self):
        pass
    def availableFilters(self):
        return filters_available

Methods

def Process(self, frame)
Expand source code
def Process(self, frame):
    self.filter.push(frame=frame)
    return self.filter.pull()
def addFilter(self, filter, arg)
Expand source code
def addFilter(self, filter, arg):
    f = self.filter.add(filter=filter, args=arg)
    self.src.link_to(f)
    f.link_to(self.filter.add("buffersink"))#thinking...
    self.filter.configure()
def availableFilters(self)
Expand source code
def availableFilters(self):
    return filters_available
def getFilter(self)
Expand source code
def getFilter(self):
    pass