For more information about the examples, such as how the Python and Mojo files interact with each other, see the Examples Overview
Record¶
An example showing how to record audio input from a microphone to a buffer and play it back using MIDI note messages.
Python Code¶
import sys
from pathlib import Path
# This example is able to run by pressing the "play" button in VSCode
# that executes the whole file.
# In order to do this, it needs to add the parent directory to the path
# (the next line here) so that it can find the mmm_src and mmm_utils packages.
# If you want to run it line by line in a REPL, skip this line!
sys.path.insert(0, str(Path(__file__).parent.parent))
from mmm_python import *
def main():
# set your audio input and output devices here:
in_device = "Fireface UCX II (24219339)"
out_device = "Fireface UCX II (24219339)"
# in_device = "MacBook Pro Microphone"
# out_device = "External Headphones"
# instantiate and load the graph
mmm_audio = MMMAudio(128, num_input_channels=18, num_output_channels=2, in_device=in_device, out_device=out_device, graph_name="Record", package_name="examples")
# the default input channel (in the Record_Synth) is 0, but you can change it
mmm_audio.send_int("set_input_chan", 16)
mmm_audio.start_audio()
import mido
import time
import threading
# find your midi devices
mido.get_input_names()
# open your midi device - you may need to change the device name
in_port = mido.open_input('Oxygen Pro Mini USB MIDI')
# Create stop event
stop_event = threading.Event()
def start_midi():
while not stop_event.is_set():
for msg in in_port.iter_pending():
if stop_event.is_set(): # Check if we should stop
return
print("Received MIDI message:", end=" ")
print(msg)
if msg.type == "note_on" and msg.note == 48:
mmm_audio.send_bool("is_recording", True)
elif msg.type == "note_off" and msg.note == 48:
mmm_audio.send_bool("is_recording", False)
time.sleep(0.01)
# Start the thread
midi_thread = threading.Thread(target=start_midi, daemon=False)
midi_thread.start()
if __name__ == "__main__":
main()
Mojo Code¶
from mmm_audio import *
struct Record(Representable, Movable, Copyable):
var world: World
var buf_dur: Float64
var buffer: Recorder[]
var is_recording: Bool
var is_playing: Float64
var trig: Bool
var play_buf: Play
var input_chan: Int
var messenger: Messenger
var note_time: Float64
fn __init__(out self, world: World):
self.world = world
self.buf_dur = 10.0 # seconds
self.buffer = Recorder(self.world, Int(self.world[].sample_rate*self.buf_dur), self.world[].sample_rate)
self.is_recording = False
self.is_playing = 0.0
self.trig = False
self.play_buf = Play(self.world)
self.input_chan = 0
self.messenger = Messenger(self.world)
self.note_time = 0.0
fn __repr__(self) -> String:
return String("Record_Synth")
fn start_recording(mut self):
self.buffer.write_head = 0
self.is_recording = True
self.is_playing = 0.0
self.trig = False
print("Recording started")
fn stop_recording(mut self):
self.is_recording = False
self.is_playing = 1.0
self.trig = True
self.note_time = Float64(self.buffer.write_head-1) / self.world[].sample_rate
print("Recorded duration:", self.note_time, "seconds")
print("Recording stopped. Now playing.")
fn next(mut self) -> MFloat[1]:
if self.messenger.notify_update(self.input_chan,"set_input_chan"):
if self.input_chan < 0 and self.input_chan >= self.world[].num_in_chans:
print("Input channel out of range, resetting to 0")
self.input_chan = 0
notified = self.messenger.notify_update(self.is_recording,"is_recording")
if notified and self.is_recording:
self.start_recording()
elif notified and not self.is_recording:
self.stop_recording()
# this code does the actual recording, placing the next sample into the buffer
# my audio interface has audio in on channel 9, so I use self.world[].sound_in[8]
if self.is_recording:
# the sound_in List in the world holds the audio in data for the current sample, so grab it from there.
self.buffer.write_next[False](self.world[].sound_in[self.input_chan]) #write the current sample to the buffer, without looping
if self.buffer.write_head >= Int(self.buffer.buf.num_frames-1):
self.stop_recording()
print("Recording stopped: buffer full")
out = self.play_buf.next(self.buffer.buf, 1.0, True, self.trig, start_frame = 0, num_frames = self.buffer.write_head-1)
env = min_env(self.play_buf.get_relative_phase(), self.note_time, 0.01)
out = out * self.is_playing * env
return out