For more information about the examples, such as how the Python and Mojo files interact with each other, see the Examples Overview
MidiSequencer¶
Python Code¶
if True:
from mmm_src.MMMAudio import MMMAudio
# instantiate and load the graph
mmm_audio = MMMAudio(128, graph_name="MidiSequencer", package_name="examples")
mmm_audio.start_audio()
# this next chunk of code is all about using a midi keyboard to control the synth---------------
# the python host grabs the midi and sends the midi messages to the mojo audio engine
mmm_audio.send_float("filt_freq", 2000.0) # initial filter frequency
mmm_audio.send_float("bend_mul", 1.2) # initial filter frequency
mmm_audio.send_floats("voice_0.note", [1000.0, 1.0])
def midi_func():
import threading
import mido
import time
from mmm_utils.functions import linexp, linlin, midicps, cpsmidi
from mmm_src.Patterns import Pseq, Pxrand
# find your midi devices
mido.get_input_names()
# open your midi device - you may need to change the device name
in_port = mido.open_input('Oxygen Pro Mini USB MIDI')
voice_seq = Pseq(list(range(8)))
# Create stop event
global stop_event
stop_event = threading.Event()
def start_midi():
while not stop_event.is_set():
for msg in in_port.iter_pending():
if stop_event.is_set(): # Check if we should stop
return
if msg.type in ["note_on", "control_change", "pitchwheel"]:
if msg.type == "note_on":
voice = "voice_" + str(voice_seq.next())
print(f"Note On: {msg.note} Velocity: {msg.velocity} Voice: {voice}")
mmm_audio.send_floats(voice +".note", [midicps(msg.note), msg.velocity / 127.0]) # note freq and velocity scaled 0 to 1
elif msg.type == "control_change":
if msg.control == 34: # Mod wheel
# on the desired cc, scale the value exponentially from 100 to 4000
# it is best practice to scale midi cc values in the host, rather than in the audio engine
mmm_audio.send_float("filt_freq", linexp(msg.value, 0, 127, 100, 4000))
elif msg.type == "pitchwheel":
mmm_audio.send_float("bend_mul", linlin(msg.pitch, -8192, 8191, 0.9375, 1.0625))
time.sleep(0.01)
# Start the thread
midi_thread = threading.Thread(target=start_midi, daemon=True)
midi_thread.start()
midi_func()
# To stop the midi thread defined above:
stop_event.set()
# this chunk of code shows how to use the sequencer to trigger notes in the mmm_audio engine
# the scheduler can also sequence notes
from mmm_src.Patterns import Pseq, Pxrand
import numpy as np
import asyncio
from mmm_utils.functions import midicps, cpsmidi
global scheduler
scheduler = mmm_audio.scheduler
voice_seq = Pseq(list(range(8)))
voice_seq.next()
async def trig_synth(wait):
"""A counter coroutine"""
count_to = np.random.choice([7, 11, 13, 17]).item()
mult_seq = Pseq(list(range(1, count_to + 1)))
fund_seq = Pxrand([36, 37, 43, 42])
i = 0
fund = midicps(fund_seq.next())
while True:
voice = "voice_" + str(voice_seq.next())
# print(f"Sequencer Note: {cpsmidi(fund * mult_seq.current())} Voice: {voice}")
mmm_audio.send_floats(voice +".note", [fund * mult_seq.next(), 100 / 127.0]) # note freq and velocity scaled 0 to 1
await asyncio.sleep(wait)
i = (i + 1) % count_to
if i == 0:
fund = midicps(fund_seq.next())
count_to = np.random.choice([7, 11, 13, 17]).item()
mult_seq = Pseq(list(range(1, count_to + 1)))
rout = scheduler.sched(trig_synth(0.1))
rout.cancel() # stop just this routine
# stop all routines
scheduler.stop_routs() # you can also stop the routines with ctl-C in the terminal
mmm_audio.stop_audio()
mmm_audio.start_audio()
Mojo Code¶
from mmm_src.MMMWorld import *
from mmm_utils.Messenger import *
from mmm_utils.functions import *
from mmm_dsp.Osc import *
from mmm_dsp.Filters import *
from mmm_dsp.Env import *
from mmm_src.MMMTraits import *
# Synth Voice - Below is a polyphonic synth. The first struct, TrigSynthVoice, is a single voice of the synth. Each voice is made up of a modulator oscillator, a carrier oscillator, and an envelope generator.
struct TrigSynthVoice(Movable, Copyable):
var world: UnsafePointer[MMMWorld] # Pointer to the MMMWorld instance
var env_params: EnvParams
var env: Env
var mod: Osc
var car: Osc[1, 0, 0]
var sub: Osc
var bend_mul: Float64
var note: List[Float64]
var messenger: Messenger
fn __init__(out self, world: UnsafePointer[MMMWorld], name_space: String = ""):
self.world = world
self.mod = Osc(self.world)
self.car = Osc[1, 0, 0](self.world)
self.sub = Osc(self.world)
self.env_params = EnvParams([0.0, 1.0, 0.75, 0.75, 0.0], [0.01, 0.1, 0.2, 0.5], [1.0])
self.env = Env(self.world)
self.bend_mul = 1.0
self.messenger = Messenger(self.world, name_space)
self.note = List[Float64]()
@always_inline
fn next(mut self) -> Float64:
make_note = self.messenger.notify_update(self.note, "note")
# if there is no trigger and the envelope is not active, that means the voice should be silent - output 0.0
if not self.env.is_active and not make_note:
return 0.0
else:
bend_freq = self.note[0] * self.bend_mul
var mod_value = self.mod.next(bend_freq * 1.5) # Modulator frequency is 3 times the carrier frequency
var env = self.env.next(self.env_params, make_note) # Trigger the envelope if trig is True
var mod_mult = env * 0.5 * linlin(bend_freq, 1000, 4000, 1, 0) #decrease the mod amount as freq increases
var car_value = self.car.next(bend_freq, mod_value * mod_mult, osc_type=2)
car_value += self.sub.next(bend_freq * 0.5) # Add a sub oscillator one octave below the carrier
car_value = car_value * 0.1 * env * self.note[1] # Scale the output by the envelope and note velocity
return car_value
struct TrigSynth(Movable, Copyable):
var world: UnsafePointer[MMMWorld] # Pointer to the MMMWorld instance
var voices: List[TrigSynthVoice]
var current_voice: Int64
# the following 5 variables are messengers (imported from mmm_utils.Messenger.mojo)
# messengers get their values from the MMMWorld message system when told to, usually once per block
# they then store that value received internally, and you can access it as a normal variable
var messenger: Messenger
var num_voices: Int64
var svf: SVF
var filt_lag: Lag
var filt_freq: Float64
var bend_mul: Float64
fn __init__(out self, world: UnsafePointer[MMMWorld], num_voices: Int64 = 8):
self.world = world
self.num_voices = num_voices
self.current_voice = 0
self.messenger = Messenger(self.world)
self.voices = List[TrigSynthVoice]()
for i in range(self.num_voices):
self.voices.append(TrigSynthVoice(self.world, "voice_"+String(i)))
self.svf = SVF(self.world)
self.filt_lag = Lag(self.world, 0.1)
self.filt_freq = 1000.0
self.bend_mul = 1.0
@always_inline
fn next(mut self) -> SIMD[DType.float64, 2]:
self.messenger.update(self.filt_freq, "filt_freq")
self.messenger.update(self.bend_mul, "bend_mul")
# self.world[].print(self.filt_freq, self.bend_mul)
if self.world[].top_of_block:
for i in range(len(self.voices)):
self.voices[i].bend_mul = self.bend_mul
var out = 0.0
# get the output of all the synths
for i in range(len(self.voices)):
out += self.voices[i].next()
out = self.svf.lpf(out, self.filt_lag.next(self.filt_freq), 2.0) * 0.6
return out
struct MidiSequencer(Representable, Movable, Copyable):
var world: UnsafePointer[MMMWorld]
var output: List[Float64] # Output buffer for audio samples
var trig_synth: TrigSynth # Instance of the Oscillator
fn __init__(out self, world: UnsafePointer[MMMWorld]):
self.world = world
self.output = List[Float64](0.0, 0.0) # Initialize output list
self.trig_synth = TrigSynth(world) # Initialize the TrigSynth with the world instance
fn __repr__(self) -> String:
return String("Midi_Sequencer")
fn next(mut self: MidiSequencer) -> SIMD[DType.float64, 2]:
return self.trig_synth.next() # Return the combined output sample