Skip to content
Snippets Groups Projects
Select Git revision
  • 5f44ca1fab29243f3871e2a8ddfa1851a8fbc384
  • master default protected
2 results

UARTE_vs_Serial.ino

Blame
  • mpy_bridge.py 3.92 KiB
    # Quentin Bolsée 2023-10-01
    # MIT Center for Bits and Atoms
    #
    # This work may be reproduced, modified, distributed,
    # performed, and displayed for any purpose, but must
    # acknowledge this project. Copyright is retained and
    # must be preserved. The work is provided as is; no
    # warranty is provided, and users accept all liability.
    import os.path
    import serial
    import time
    import re
    import ast
    
    
    class MicroPythonError(RuntimeError):
        pass
    
    
    class Device:
        init_delay = 0.1
    
        def __init__(self, port, main_filename="main.py"):
            self.ser_obj = serial.Serial(port)
            self.function_args = {}
            self.function_callable = {}
            self.main_filename = main_filename
            self.main_module = main_filename.replace(".py", "")
            # initialize
            self.init()
            self.read_functions()
    
        def init(self):
            s = self.ser_obj
            s.write(b'\x04')
            s.write(b'\r\n\x03\r\n')
            s.write(b'\x01')
            time.sleep(self.init_delay)
            # clean slate
            s.flushInput()
            # make sure main file is imported
            self.run(f'import {self.main_module}')
    
        def run(self, cmd, show=False, end="\n"):
            s = self.ser_obj
            s.write((cmd+end).encode("utf-8"))
            s.write(b'\x04')  # ^d reset
    
            # >OK<RETURN>\x04
            txt_ret = s.read_until(b"\x04")[3:-1].decode("utf-8")
    
            # <ERROR>\x04
            txt_err = s.read_until(b"\x04")[:-1].decode("utf-8")
    
            if len(txt_err) > 0:
                raise MicroPythonError(txt_err)
    
            if show:
                print(f"RETURN: '{txt_ret.rstrip()}'")
    
            return txt_ret.rstrip()
    
        def run_func(self, func_name, *args, **kwargs):
            args_list = list(repr(x) for x in args)
            kwargs_list = list(f"{a}={repr(b)}" for a, b in kwargs.items())
            cmd_txt = f"print(repr({self.main_module}.{func_name}({','.join(args_list+kwargs_list)})))"
            ret_txt = self.run(cmd_txt)
            return ast.literal_eval(ret_txt)
    
        def read_functions(self):
            try:
                self.run(f'f=open("{self.main_filename}","rb")')
            except MicroPythonError:
                raise FileNotFoundError(f"Could not find {self.main_filename} on device!")
    
            # read main txt file
            main_txt = ast.literal_eval(self.run('print(f.read())')).decode("utf-8")
    
            # find all functions
            matches = re.finditer(r"def\s+([^(]+)\((.*)\):", main_txt)
            for m in matches:
                name = m.group(1)
                # generate function
                func = lambda *args, func_name=name, **kwargs: self.run_func(func_name, *args, **kwargs)
                self.function_args[name] = m.group(2)
                self.function_callable[name] = func
                setattr(self, name, func)
    
        def __str__(self):
            txt = f"MicroPython Device at {self.ser_obj.port}, available functions:\n"
            if len(self.function_args) == 0:
                txt += "  none\n"
            else:
                for a, b in self.function_args.items():
                    txt += f"  {a}({b})\n"
            return txt[:-1]
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()
    
        def update(self):
            for name in self.function_args:
                delattr(self, name)
            self.function_args = {}
            self.function_callable = {}
            self.init()
            self.read_functions()
    
        def upload_main(self, filename):
            self.upload(filename, self.main_filename)
    
        def upload(self, filename, destination=None, update=True):
            if destination is None:
                _, destination = os.path.split(filename)
            with open(filename, "r") as f:
                file_txt = f.read()
            self.run(f'f = open("{destination}", "wb")')
            self.run(f'f.write({repr(file_txt)})')
            self.run(f'f.close()')
            if update:
                self.update()
    
        def remove(self, filename):
            self.run('import os')
            self.run(f'os.remove("{filename}")')
    
        def close(self):
            self.ser_obj.close()