Skip to content

For more information about the examples, such as how the Python and Mojo files interact with each other, see the Examples Overview

Record

An example showing how to record audio input from a microphone to a buffer and play it back using MIDI note messages.

Python Code

if True:
    from mmm_python.MMMAudio import *
    list_audio_devices()

    # set your audio input and output devices here:
    in_device = "Fireface UCX II (24219339)"
    out_device = "Fireface UCX II (24219339)"

    # in_device = "MacBook Pro Microphone"
    # out_device = "External Headphones"


    # instantiate and load the graph
    mmm_audio = MMMAudio(128, num_input_channels=12, num_output_channels=2, in_device=in_device, out_device=out_device, graph_name="Record", package_name="examples")

    # the default input channel (in the Record_Synth) is 0, but you can change it
    mmm_audio.send_int("set_input_chan", 0) 
    mmm_audio.start_audio() 

mmm_audio.send_bool("is_recording", True)
mmm_audio.send_bool("is_recording", False)

# this program is looking for midi note_on and note_off from note 48, so we prepare the keyboard to send messages to mmm_audio:
if True:
    import mido
    import time
    import threading
    from mmm_python.python_utils import *

    # find your midi devices
    mido.get_input_names()

    # open your midi device - you may need to change the device name
    in_port = mido.open_input('Oxygen Pro Mini USB MIDI')

    # Create stop event
    stop_event = threading.Event()
    def start_midi():
        while not stop_event.is_set():
            for msg in in_port.iter_pending():
                if stop_event.is_set():  # Check if we should stop
                    return
                print("Received MIDI message:", end=" ")
                print(msg)

                if msg.type == "note_on" and msg.note == 48:
                    mmm_audio.send_bool("is_recording", True)
                elif msg.type == "note_off" and msg.note == 48:
                    mmm_audio.send_bool("is_recording", False)
            time.sleep(0.01)

    # Start the thread
    midi_thread = threading.Thread(target=start_midi, daemon=True)
    midi_thread.start()

# To stop the thread:
stop_event.set()

mmm_audio.stop_audio()

Mojo Code

from mmm_audio import *

import time

struct Record_Synth(Representable, Movable, Copyable):
    var world: UnsafePointer[MMMWorld]
    var buf_dur: Float64
    var buffer: Recorder
    var is_recording: Bool
    var is_playing: Float64
    var playback_speed: Float64
    var trig: Bool
    var write_pos: Int64 
    var play_buf: Play
    var note_time: Float64
    var num_frames: Int64
    var input_chan: Int64
    var messenger: Messenger

    fn __init__(out self, world: UnsafePointer[MMMWorld]):
        self.world = world
        self.buf_dur = 10.0  # seconds
        self.buffer = Recorder(self.world, Int64(self.world[].sample_rate*self.buf_dur), self.world[].sample_rate)
        self.is_recording = False
        self.is_playing = 0.0
        self.trig = False
        self.playback_speed = 1.0
        self.play_buf = Play(self.world)
        self.write_pos = 0
        self.note_time = 0.0
        self.num_frames = 0
        self.input_chan = 0
        self.messenger = Messenger(self.world)

    fn __repr__(self) -> String:
        return String("Record_Synth")

    fn start_recording(mut self):
        self.note_time = time.perf_counter()
        self.write_pos = 0
        self.is_recording = True
        self.is_playing = 0.0
        self.trig = False
        print("Recording started")

    fn stop_recording(mut self):
        self.note_time = min(time.perf_counter() - self.note_time, self.buf_dur)
        self.num_frames = Int64(self.note_time*self.world[].sample_rate)
        self.note_time = Float64(self.num_frames) / self.world[].sample_rate
        self.is_recording = False
        self.is_playing = 1.0
        self.trig = True
        self.write_pos = 0
        print("Recorded duration:", self.note_time, "seconds")
        print("Recording stopped. Now playing.")

    fn next(mut self) -> SIMD[DType.float64, 1]:
        if self.messenger.notify_update(self.input_chan,"set_input_chan"):
            if self.input_chan < 0 and self.input_chan >= self.world[].num_in_chans:
                print("Input channel out of range, resetting to 0")
                self.input_chan = 0

        notified = self.messenger.notify_update(self.is_recording,"is_recording")
        if notified and self.is_recording:
            self.start_recording()
        elif notified and not self.is_recording:
            self.stop_recording()

        # this code does the actual recording, placing the next sample into the buffer
        # my audio interface has audio in on channel 9, so I use self.world[].sound_in[8]
        if self.is_recording:
            # the sound_in List in the world holds the audio in data for the current sample, so grab it from there.
            self.buffer.write(self.world[].sound_in[self.input_chan], self.write_pos)
            self.write_pos += 1
            if self.write_pos >= Int(self.buffer.buf.num_frames):
                self.is_recording = False
                print("Recording stopped: buffer full")
                self.is_playing = 1.0
                self.trig = True
                self.write_pos = 0

        out = self.play_buf.next(self.buffer.buf, self.playback_speed, True, self.trig, start_frame = 0, num_frames = self.num_frames)

        env = min_env(self.play_buf.get_relative_phase(), self.note_time, 0.01)

        out = out * self.is_playing * env

        return out

struct Record(Representable, Movable, Copyable):
    var world: UnsafePointer[MMMWorld]

    var synth: Record_Synth

    fn __init__(out self, world: UnsafePointer[MMMWorld]):
        self.world = world
        self.synth = Record_Synth(self.world)

    fn __repr__(self) -> String:
        return String("Record")

    fn next(mut self) -> SIMD[DType.float64, 2]:
        return self.synth.next()