Skip to content

For more information about the examples, such as how the Python and Mojo files interact with each other, see the Examples Overview

Record

Python Code

if True:
    from mmm_src.MMMAudio import *
    list_audio_devices()

    in_device = "Fireface UFX+ (24082112)"
    out_device = "Fireface UFX+ (24082112)"

    # in_device = "MacBook Pro Microphone"
    # out_device = "External Headphones"


    # instantiate and load the graph
    mmm_audio = MMMAudio(128, num_input_channels=12, num_output_channels=2, in_device=in_device, out_device=out_device, graph_name="Record", package_name="examples")

    # the default input channel (in the Record_Synth) is 0, but you can change it
    mmm_audio.send_int("set_input_chan", 8) 
    mmm_audio.start_audio() 

mmm_audio.send_bool("is_recording", True)
mmm_audio.send_bool("is_recording", False)

# this program is looking for midi note_on and note_off from note 48, so we prepare the keyboard to send messages to mmm_audio:
if True:
    import mido
    import time
    import threading
    from mmm_utils.functions import *

    # find your midi devices
    mido.get_input_names()

    # open your midi device - you may need to change the device name
    in_port = mido.open_input('Oxygen Pro Mini USB MIDI')

    # Create stop event
    stop_event = threading.Event()
    def start_midi():
        while not stop_event.is_set():
            for msg in in_port.iter_pending():
                if stop_event.is_set():  # Check if we should stop
                    return
                print("Received MIDI message:", end=" ")
                print(msg)

                if msg.type == "note_on" and msg.note == 48:
                    mmm_audio.send_bool("is_recording", True)
                elif msg.type == "note_off" and msg.note == 48:
                    mmm_audio.send_bool("is_recording", False)
            time.sleep(0.01)

    # Start the thread
    midi_thread = threading.Thread(target=start_midi, daemon=True)
    midi_thread.start()

# To stop the thread:
stop_event.set()

mmm_audio.stop_audio()

Mojo Code

from mmm_src.MMMWorld import MMMWorld
from mmm_utils.Messenger import *
from mmm_utils.functions import *
from mmm_src.MMMTraits import *
from mmm_dsp.Buffer import *
from mmm_dsp.RecordBuf import RecordBuf
from mmm_dsp.PlayBuf import PlayBuf
from mmm_dsp.Env import min_env

import time
from math import floor

from mmm_dsp.Osc import Osc

struct Record_Synth(Representable, Movable, Copyable):
    var world: UnsafePointer[MMMWorld]
    var buf_dur: Float64
    var buffer: Buffer
    var is_recording: Bool
    var is_playing: Float64
    var playback_speed: Float64
    var trig: Bool
    var write_pos: Int64 
    var record_buf: RecordBuf
    var play_buf: PlayBuf
    var note_time: Float64
    var num_frames: Float64
    var input_chan: Int64
    var messenger: Messenger

    fn __init__(out self, world: UnsafePointer[MMMWorld]):
        self.world = world
        self.buf_dur = 10.0  # seconds
        self.buffer = Buffer(1, Int64(self.world[].sample_rate*self.buf_dur), self.world[].sample_rate)
        self.is_recording = False
        self.is_playing = 0.0
        self.trig = False
        self.playback_speed = 1.0
        self.record_buf = RecordBuf(world)
        self.play_buf = PlayBuf(world)
        self.write_pos = 0
        self.note_time = 0.0
        self.num_frames = 0
        self.input_chan = 0
        self.messenger = Messenger(world)

    fn __repr__(self) -> String:
        return String("Record_Synth")

    fn start_recording(mut self):
        self.note_time = time.perf_counter()
        self.write_pos = 0
        self.is_recording = True
        self.is_playing = 0.0
        self.trig = False
        print("Recording started")

    fn stop_recording(mut self):
        self.note_time = min(time.perf_counter() - self.note_time, self.buf_dur)
        self.num_frames = floor(self.note_time*self.world[].sample_rate)
        self.note_time = self.num_frames / self.world[].sample_rate
        self.is_recording = False
        self.is_playing = 1.0
        self.trig = True
        self.write_pos = 0
        print(self.note_time, self.num_frames/self.world[].sample_rate)
        print("Recorded duration:", self.note_time, "seconds")
        print("Recording stopped. Now playing.")

    fn next(mut self) -> SIMD[DType.float64, 1]:
        if self.messenger.notify_update(self.input_chan,"set_input_chan"):
            if self.input_chan < 0 and self.input_chan >= self.world[].num_in_chans:
                print("Input channel out of range, resetting to 0")
                self.input_chan = 0

        notified = self.messenger.notify_update(self.is_recording,"is_recording")
        if notified and self.is_recording:
            self.start_recording()
        elif notified and not self.is_recording:
            self.stop_recording()

        # this code does the actual recording, placing the next sample into the buffer
        # my audio interface has audio in on channel 9, so I use self.world[].sound_in[8]
        if self.is_recording:
            # the sound_in List in the world holds the audio in data for the current sample, so grab it from there.
            self.buffer.write(self.world[].sound_in[self.input_chan], self.write_pos)
            self.write_pos += 1
            if self.write_pos >= Int(self.buffer.num_frames):
                self.is_recording = False
                print("Recording stopped: buffer full")
                self.is_playing = 1.0
                self.trig = True
                self.write_pos = 0

        out = self.play_buf.next(self.buffer, 0, self.playback_speed, True, self.trig, start_frame = 0, num_frames = self.num_frames)

        env = min_env(self.play_buf.get_win_phase(), self.note_time, 0.01)

        out = out * self.is_playing * env

        return out

struct Record(Representable, Movable, Copyable):
    var world: UnsafePointer[MMMWorld]

    var synth: Record_Synth

    fn __init__(out self, world: UnsafePointer[MMMWorld]):
        self.world = world
        self.synth = Record_Synth(self.world)

    fn __repr__(self) -> String:
        return String("Record")

    fn next(mut self) -> SIMD[DType.float64, 2]:
        return self.synth.next()