> Welcome! This page provides a handy Python script for bridging two exciting decentralized communication protocols: Meshtastic and MeshCore. If you're running nodes on both networks and want them to be able to share public channel messages, this relay is for you.
> The script acts as a simple, bi-directional "universal translator." It listens for text messages on a designated public channel on one network, and when it hears one, it rebroadcasts it onto the public channel of the other.
[MT<>MC]
) is added to relayed messages to prevent a message from being endlessly bounced back and forth between the two networks.longName
to make conversations easier to follow.asyncio
, the script handles communication with both devices concurrently without performance issues.> The script operates by running two main functions in parallel:
meshtastic_to_meshcore_relay
: This function subscribes to incoming Meshtastic messages. When a new text message is received, it extracts the sender's name and the message content. It then formats a new message string, prepends the [MT<>MC]
identifier, and sends it to the MeshCore device.meshcore_to_meshtastic_relay
: This function polls the MeshCore device for new channel messages. When a message arrives, it similarly prepends the relay identifier and broadcasts it over the Meshtastic network using the connected radio.> To avoid relaying the same message multiple times, the script maintains a temporary cache of message IDs it has already processed from each network.
> !!!Please note, your Meshcore node needs the (companion radio USB) role to be flashed. The (companion radio Bluetooth) role is not supported by this script!!!
> You'll need Python 3.7 or newer. You can install the required libraries using pip. Python virtual environment (venv) is recommended:
$ pip install meshtastic meshcore pyserial
MT-MC_relay.py
).MESHTASTIC_PORT = "/dev/ttyUSB1"
MESHCORE_PORT = "/dev/ttyUSB0"
ls /dev/ttyUSB*
.COM3
, COM4
, etc./dev/tty.usbserial-*
.$ python MT-MC_relay.py
Ctrl+C
.> FILE: MT-MC_relay.py
# Relay for Meshtastic and MeshCore public channels.
import asyncio
import sys
from meshtastic import serial_interface
from meshtastic.protobuf.portnums_pb2 import PortNum
from meshcore import MeshCore, EventType
from pubsub import pub
import serial
# --- Configuration ---
MESHTASTIC_PORT = "/dev/ttyUSB1"
MESHCORE_PORT = "/dev/ttyUSB0"
RELAY_ID_PREFIX = "[MT<>MC]"
MAX_ID_CACHE_SIZE = 200
# --- State Management ---
meshtastic_relayed_ids = set()
meshcore_relayed_ids = set()
# --- Meshtastic-to-MeshCore Logic ---
async def meshtastic_to_meshcore_relay(meshtastic_interface, meshcore_instance):
"""Listens for Meshtastic messages and forwards them to MeshCore."""
loop = asyncio.get_running_loop()
queue = asyncio.Queue()
def on_meshtastic_message(packet, interface=None):
"""Synchronous callback that puts received packets onto an async queue."""
loop.call_soon_threadsafe(queue.put_nowait, packet)
pub.subscribe(on_meshtastic_message, "meshtastic.receive")
print("Meshtastic listener is active.")
while True:
try:
packet = await queue.get()
decoded = packet.get('decoded', {})
portnum = decoded.get('portnum')
is_text = (portnum == PortNum.TEXT_MESSAGE_APP or str(portnum) == 'TEXT_MESSAGE_APP')
if is_text:
message_text = decoded.get('text')
message_id = packet.get("id")
if not message_text or message_text.startswith(RELAY_ID_PREFIX) or message_id in meshtastic_relayed_ids:
continue
# Get the sender's name from the Meshtastic node list
from_id = packet.get('fromId')
sender_name = from_id # Default to the node ID if name is not found
if from_id in meshtastic_interface.nodes:
node_info = meshtastic_interface.nodes[from_id]
if 'user' in node_info and 'longName' in node_info['user']:
sender_name = node_info['user']['longName']
# Combine the sender's name and the message text
full_message = f"{sender_name}: {message_text}"
print(f"Meshtastic -> MeshCore: '{full_message}'")
if len(meshtastic_relayed_ids) > MAX_ID_CACHE_SIZE:
meshtastic_relayed_ids.clear()
meshtastic_relayed_ids.add(message_id)
relayed_text = f"{RELAY_ID_PREFIX} {full_message}"
# Based on the inspector output, 'send_chan_msg' is the correct method
# for sending a message to a specific channel index.
await meshcore_instance.commands.send_chan_msg(chan=0, msg=relayed_text)
print("DEBUG: MeshCore send_chan_msg command completed.")
except Exception as e:
print(f"Error in Meshtastic-to-MeshCore relay: {e}", file=sys.stderr)
await asyncio.sleep(1)
# --- MeshCore-to-Meshtastic Logic ---
async def meshcore_to_meshtastic_relay(meshtastic_interface, meshcore_instance):
"""Listens for MeshCore messages and forwards them to Meshtastic."""
loop = asyncio.get_running_loop()
async def handle_meshcore_message(event):
global meshcore_relayed_ids
msg = event.payload
message_text = msg.get("text", "")
sender_timestamp = msg.get("sender_timestamp")
unique_id = f"{sender_timestamp}-{message_text}"
if not message_text or message_text.startswith(RELAY_ID_PREFIX) or unique_id in meshcore_relayed_ids:
return
print(f"MeshCore -> Meshtastic: '{message_text}'")
if len(meshcore_relayed_ids) > MAX_ID_CACHE_SIZE:
meshcore_relayed_ids.clear()
meshcore_relayed_ids.add(unique_id)
relayed_text = f"{RELAY_ID_PREFIX} {message_text}"
await loop.run_in_executor(
None,
lambda: meshtastic_interface.sendText(relayed_text)
)
meshcore_instance.subscribe(EventType.CHANNEL_MSG_RECV, handle_meshcore_message)
await meshcore_instance.start_auto_message_fetching()
print("MeshCore listener is active.")
await asyncio.Event().wait()
# --- Main Execution ---
async def main():
meshtastic_interface = None
meshcore_instance = None
loop = asyncio.get_running_loop()
try:
print("--- Mesh Relay ---")
print(f"Connecting to Meshtastic radio on {MESHTASTIC_PORT}...")
# meshtastic-python is synchronous, so we initialize it in an executor.
meshtastic_interface = await loop.run_in_executor(
None, serial_interface.SerialInterface, MESHTASTIC_PORT
)
print("Meshtastic radio connected.")
print(f"Connecting to MeshCore radio on {MESHCORE_PORT}...")
meshcore_instance = await MeshCore.create_serial(MESHCORE_PORT)
print("MeshCore radio connected.")
# A simple, robust delay to allow both radios to settle after connection.
print("Waiting for radios to settle...")
await asyncio.sleep(5)
print("\nStarting relay bridges... Press Ctrl+C to stop.")
# Run both relay tasks concurrently.
await asyncio.gather(
meshtastic_to_meshcore_relay(meshtastic_interface, meshcore_instance),
meshcore_to_meshtastic_relay(meshtastic_interface, meshcore_instance)
)
except (serial.serialutil.SerialException, FileNotFoundError) as e:
print(f"\nFATAL ERROR: Could not connect to a radio. Please check your port configuration.", file=sys.stderr)
print(f" Details: {e}", file=sys.stderr)
except KeyboardInterrupt:
print("\nShutting down relay...")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}", file=sys.stderr)
finally:
if meshtastic_interface:
meshtastic_interface.close()
print("Relay shutdown complete.")
if __name__ == "__main__":
asyncio.run(main())
> THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
> By using this script, you agree that you are doing so at your own risk.