For more information about the examples, such as how the Python and Mojo files interact with each other, see the Examples Overview
Record¶
Python Code¶
if True:
from mmm_src.MMMAudio import *
list_audio_devices()
in_device = "Fireface UFX+ (24082112)"
out_device = "Fireface UFX+ (24082112)"
# in_device = "MacBook Pro Microphone"
# out_device = "External Headphones"
# instantiate and load the graph
mmm_audio = MMMAudio(128, num_input_channels=12, num_output_channels=2, in_device=in_device, out_device=out_device, graph_name="Record", package_name="examples")
# the default input channel (in the Record_Synth) is 0, but you can change it
mmm_audio.send_int("set_input_chan", 8)
mmm_audio.start_audio()
mmm_audio.send_bool("is_recording", True)
mmm_audio.send_bool("is_recording", False)
# this program is looking for midi note_on and note_off from note 48, so we prepare the keyboard to send messages to mmm_audio:
if True:
import mido
import time
import threading
from mmm_utils.functions import *
# find your midi devices
mido.get_input_names()
# open your midi device - you may need to change the device name
in_port = mido.open_input('Oxygen Pro Mini USB MIDI')
# Create stop event
stop_event = threading.Event()
def start_midi():
while not stop_event.is_set():
for msg in in_port.iter_pending():
if stop_event.is_set(): # Check if we should stop
return
print("Received MIDI message:", end=" ")
print(msg)
if msg.type == "note_on" and msg.note == 48:
mmm_audio.send_bool("is_recording", True)
elif msg.type == "note_off" and msg.note == 48:
mmm_audio.send_bool("is_recording", False)
time.sleep(0.01)
# Start the thread
midi_thread = threading.Thread(target=start_midi, daemon=True)
midi_thread.start()
# To stop the thread:
stop_event.set()
mmm_audio.stop_audio()
Mojo Code¶
from mmm_src.MMMWorld import MMMWorld
from mmm_utils.Messenger import *
from mmm_utils.functions import *
from mmm_src.MMMTraits import *
from mmm_dsp.Buffer import *
from mmm_dsp.RecordBuf import RecordBuf
from mmm_dsp.PlayBuf import PlayBuf
from mmm_dsp.Env import min_env
import time
from math import floor
from mmm_dsp.Osc import Osc
struct Record_Synth(Representable, Movable, Copyable):
var world: UnsafePointer[MMMWorld]
var buf_dur: Float64
var buffer: Buffer
var is_recording: Bool
var is_playing: Float64
var playback_speed: Float64
var trig: Bool
var write_pos: Int64
var record_buf: RecordBuf
var play_buf: PlayBuf
var note_time: Float64
var num_frames: Float64
var input_chan: Int64
var messenger: Messenger
fn __init__(out self, world: UnsafePointer[MMMWorld]):
self.world = world
self.buf_dur = 10.0 # seconds
self.buffer = Buffer(1, Int64(self.world[].sample_rate*self.buf_dur), self.world[].sample_rate)
self.is_recording = False
self.is_playing = 0.0
self.trig = False
self.playback_speed = 1.0
self.record_buf = RecordBuf(world)
self.play_buf = PlayBuf(world)
self.write_pos = 0
self.note_time = 0.0
self.num_frames = 0
self.input_chan = 0
self.messenger = Messenger(world)
fn __repr__(self) -> String:
return String("Record_Synth")
fn start_recording(mut self):
self.note_time = time.perf_counter()
self.write_pos = 0
self.is_recording = True
self.is_playing = 0.0
self.trig = False
print("Recording started")
fn stop_recording(mut self):
self.note_time = min(time.perf_counter() - self.note_time, self.buf_dur)
self.num_frames = floor(self.note_time*self.world[].sample_rate)
self.note_time = self.num_frames / self.world[].sample_rate
self.is_recording = False
self.is_playing = 1.0
self.trig = True
self.write_pos = 0
print(self.note_time, self.num_frames/self.world[].sample_rate)
print("Recorded duration:", self.note_time, "seconds")
print("Recording stopped. Now playing.")
fn next(mut self) -> SIMD[DType.float64, 1]:
if self.messenger.notify_update(self.input_chan,"set_input_chan"):
if self.input_chan < 0 and self.input_chan >= self.world[].num_in_chans:
print("Input channel out of range, resetting to 0")
self.input_chan = 0
notified = self.messenger.notify_update(self.is_recording,"is_recording")
if notified and self.is_recording:
self.start_recording()
elif notified and not self.is_recording:
self.stop_recording()
# this code does the actual recording, placing the next sample into the buffer
# my audio interface has audio in on channel 9, so I use self.world[].sound_in[8]
if self.is_recording:
# the sound_in List in the world holds the audio in data for the current sample, so grab it from there.
self.buffer.write(self.world[].sound_in[self.input_chan], self.write_pos)
self.write_pos += 1
if self.write_pos >= Int(self.buffer.num_frames):
self.is_recording = False
print("Recording stopped: buffer full")
self.is_playing = 1.0
self.trig = True
self.write_pos = 0
out = self.play_buf.next(self.buffer, 0, self.playback_speed, True, self.trig, start_frame = 0, num_frames = self.num_frames)
env = min_env(self.play_buf.get_win_phase(), self.note_time, 0.01)
out = out * self.is_playing * env
return out
struct Record(Representable, Movable, Copyable):
var world: UnsafePointer[MMMWorld]
var synth: Record_Synth
fn __init__(out self, world: UnsafePointer[MMMWorld]):
self.world = world
self.synth = Record_Synth(self.world)
fn __repr__(self) -> String:
return String("Record")
fn next(mut self) -> SIMD[DType.float64, 2]:
return self.synth.next()