Skip to content

For more information about the examples, such as how the Python and Mojo files interact with each other, see the Examples Overview

MPlotExample

Python Code

from PySide6.QtWidgets import QApplication, QMainWindow, QWidget, QVBoxLayout, QSizePolicy
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from mmm_python import *
from umap import UMAP
from sklearn.neighbors import KDTree
import librosa
import numpy as np
import pickle
from sklearn.preprocessing import StandardScaler
import argparse

def main():
    parser = argparse.ArgumentParser(description="Run the MPlotExample.")
    parser.add_argument("--analysis", type=str, help="Path to pickled analysis dict with keys 'data_umap' and 'slice_points'", required=False)
    args = parser.parse_args()

    if not args.analysis:
        # parameters for analysis
        d = {
            # "path": "resources/Shiverer.wav",
            "path": "/Users/ted/Desktop/all_flucoma.wav",
            # threshold for spectral flux onset detection, lower is more sensitive, higher is less sensitive
            "thresh":2.0,
            # minimum length of slices in seconds
            "min_slice_len":0.1,
            # window size and hop size used for all analyses
            "window_size":1024,
            "hop_size":512,
            # num mfcc coefficients to compute (including 0th), we will discard the 0th coefficient later
            "num_coeffs": 14
        }

        # use librosa to load audio file to get sample rate and samples for plotting waveform
        y, sr = librosa.load(d["path"], sr=None)

        # slice using spectral flux
        slice_points = MBufAnalysis.spectral_flux_onsets(d)

        print("num slice points:", len(slice_points))
        print("avg slice duration:", np.diff(slice_points).mean() / sr)

        slice_points = np.insert(slice_points, 0, 0) # add start of file as first slice point
        slice_points = np.append(slice_points, len(y)) # add end of file as last slice point
        data = np.ndarray((len(slice_points)-1, 13)) # create array to hold slice features

        for i in range(len(slice_points)-1):
            start = int(slice_points[i])
            end = int(slice_points[i+1])
            print(f"Slice {i} / {len(slice_points)-1}: start={start}, end={end}, duration={(end-start)/sr:.2f} seconds")
            d["start_frame"] = start
            d["num_frames"] = end - start
            mfccs = MBufAnalysis.mfcc(d)
            # remove 0th coefficient and take mean across time axis to get one feature vector per slice
            data[i] = mfccs[:, 1:].mean(axis=0)

        data = StandardScaler().fit_transform(data)

        print("data shape:", data.shape)

        data_umap = UMAP(n_components=2,learning_rate=0.1,min_dist=0.01,n_epochs=200).fit_transform(data)

        # pickle analysis
        with open("examples/MPlotExample_Analysis.pkl", "wb") as f:
            pickle.dump({
                "data_umap": data_umap,
                "slice_points": slice_points,
                "path": d["path"]
            }, f)
    else:
        with open(args.analysis, "rb") as f:
            d = pickle.load(f)
        data_umap = d["data_umap"]
        slice_points = d["slice_points"]
        y, sr = librosa.load(d["path"], sr=None)

    kdtree = KDTree(data_umap)

    ma = MMMAudio(128,graph_name="MPlotExample", package_name="examples", in_device=None, out_device="default")
    ma.send_string("load_sound", d["path"])

    prev = None

    def get_nearest(view, x, y, button, is_dragging, key, dblclick, step):
        nonlocal prev
        if step is None:
            dist, idx = kdtree.query([[x, y]], k=1)
            nearest = int(idx[0][0])
            if nearest != prev:
                prev = nearest
                start = slice_points[nearest]
                num = slice_points[nearest+1] - start

                view.highlight_index(nearest)

                waveform_win.highlight(start, num)

                ma.send_ints("play_data", [start, num])
                print(f"x: {x:.2f}, y: {y:.2f}")
                print(f"Nearest idx: {idx[0][0]}, dist: {dist[0][0]:.4f}")

    app = QApplication([])
    main = QMainWindow()
    root = QWidget()
    layout = QVBoxLayout(root)

    win = MPlot(data_umap, mouse_callback=get_nearest,xlabel="UMAP 1", ylabel="UMAP 2")
    waveform_win = MWaveform(y, slice_points)
    waveform_win.setFixedHeight(220)
    waveform_win.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)

    layout.addWidget(win)
    layout.addWidget(waveform_win)
    layout.setStretch(0, 1)
    layout.setStretch(1, 0)
    main.setCentralWidget(root)

    def shutdown_audio():
        ma.stop_audio()
        ma.stop_process()

    app.aboutToQuit.connect(shutdown_audio)
    main.closeEvent = lambda event: (shutdown_audio(), event.accept())

    main.resize(900, 850)
    main.show()

    ma.start_audio()

    sys.exit(app.exec())

if __name__ == "__main__":
    main()

Mojo Code

from mmm_audio import *

struct MPlotExampleGrain[num_chans: Int = 1](Movable, Copyable):
    var world: World
    var start_frame: Int
    var num_frames: Int
    var player: Play
    var env: Env
    var envTrigger: Bool

    def __init__(out self, world: World):
        self.world = world
        self.start_frame = 0
        self.num_frames = 0
        self.env = Env(self.world)
        self.env.params.values = [0.0, 1.0, 1.0, 0.0]
        self.env.params.times = [0.03, 1.0, 0.03]
        self.env.params.curves = [0.0, 0.0, 0.0]
        self.envTrigger = False

        self.player = Play(self.world)

    def start(mut self, buf: SIMDBuffer[Self.num_chans], start_frame: Int, num_frames: Int):
        self.start_frame = start_frame
        self.num_frames = num_frames
        duration = Float64(num_frames) / buf.sample_rate
        self.env.params.times[1] = duration - 0.06
        self.envTrigger = True

    def next(mut self, buf: SIMDBuffer[Self.num_chans]) -> MFloat[Self.num_chans]:

        env = self.env.next(self.envTrigger)

        if not self.env.is_active:
            return 0.0

        out = self.player.next[num_chans=Self.num_chans,interp=Interp.none](
            buf=buf, 
            rate=1.0, 
            loop=False, 
            trig=self.envTrigger, 
            start_frame=self.start_frame, 
            num_frames=self.num_frames
            )

        # self.world[].print("grain player out=", out,"env=", env, "start=", self.start_frame, "num=", self.num_frames)

        out *= env
        self.envTrigger = False
        return out

    def is_available(self) -> Bool:
        return not self.env.is_active

struct MPlotExample(Movable, Copyable):
    var world: World
    var buf: SIMDBuffer[1]
    var grains: List[MPlotExampleGrain[1]]
    var m: Messenger
    var play_data: List[Int]
    var path: String

    def __init__(out self, world: World):
        self.world = world
        self.grains = List[MPlotExampleGrain[1]](length=30, fill=MPlotExampleGrain[1](world))
        self.m = Messenger(self.world)
        self.play_data = List[Int](length=2, fill=0)
        self.path = String("/Users/ted/Desktop/all_flucoma.wav")
        self.buf = SIMDBuffer[1].load(self.path)

    def next(mut self) -> MFloat[2]:

        if self.m.notify_update("load_sound", self.path):
            self.buf = self.buf.load(self.path)

        trig = self.m.notify_update("play_data", self.play_data)

        if trig:
            print("Playing slice: start=",self.play_data[0], ", num=", self.play_data[1])
            for i in range(len(self.grains)):
                if self.grains[i].is_available():
                    print("Starting grain ", i)
                    self.grains[i].start(self.buf, self.play_data[0], self.play_data[1])
                    break

        out = MFloat[2](0.0)
        for ref g in self.grains:
            out += g.next(self.buf)

        return out