Skip to content
Snippets Groups Projects
Select Git revision
  • 8f55258f60a8ca78a984862aa74ba711ed7345d1
  • master default
  • neil
  • hpgl
  • sw-shawn-liu
  • ppa
  • fran
  • trotec-port
  • fs-hotfix
  • jake
  • ml-mods
  • fabmo-app
12 results

files.html

Blame
  • serialserver.py 3.30 KiB
    #!/usr/bin/env python3
    #
    # serialserver.py
    #    send a serial file with handshaking
    #
    # Neil Gershenfeld 1/17/20
    #
    # This work may be reproduced, modified, distributed,
    # performed, and displayed for any purpose, but must
    # acknowledge the mods project. Copyright is
    # retained and must be preserved. The work is provided
    # as is; no warranty is provided, and users accept all 
    # liability.
    #
    # imports
    #
    import sys,serial,asyncio,websockets,json
    #
    # command line
    #
    if (len(sys.argv) != 3):
       print("command line: serialserver.py address port")
       print("   address = client address")
       print("   port = port")
       sys.exit()
    client = sys.argv[1]
    port = int(sys.argv[2])
    #
    # WebSocket handler
    #
    async def receive(websocket,path):
       while (1):
          msg = await websocket.recv()
          address = websocket.remote_address[0]
          if (address != client):
             #
             # reject client
             #
             print("connection rejected from "+address)
             continue
          #
          # accept client
          #
          print("connection accepted from "+address)
          vars = json.loads(msg)
          if (vars['type'] == 'open'):
             #
             # open port
             #
             device = vars['device']
             speed = int(vars['baud'])
             flow = vars['flow']
             print(f"open {device} at {speed} with {flow}")
             try:
                if (flow == "xonxoff"):
                   s = serial.Serial(
                      device,baudrate=speed,xonxoff=True, timeout=0)
                elif (flow == "rtscts"):
                   s = serial.Serial(
                      device,baudrate=speed,rtscts=True, timeout=0)
                elif (flow == "dsrdtr"):
                   s = serial.Serial(
                      device,baudrate=speed,dsrdtr=True, timeout=0)
                elif (flow == "none"):
                   s = serial.Serial(
                      device,baudrate=speed,timeout=0)
                s.flushInput()
                s.flushOutput()
             except serial.SerialException as err:
                print(err)
                await websocket.send(str(err))
          elif (vars['type'] == 'command'):
             #
             # send command
             #
             print('send command')
             data = vars['contents']
             n = 0
             for c in data:
                print(c)
                if (flow == "dsrdtr"):
                   while (s.getDSR() != True):
                      time.sleep(0.001)
                elif (flow == "rtscts"):
                   while (s.getCTS() != True):
                      time.sleep(0.001)
                s.write(c)
                s.flush()
                n += 1
                percent = (100.0*n)/len(data)
                await websocket.send(str(percent))
          elif (vars['type'] == 'file'):
             #
             # send file
             #
             print('send file')
             data = vars['contents']
             n = 0
             for c in data:
                print(c)
                if (flow == "dsrdtr"):
                   while (s.getDSR() != True):
                      time.sleep(0.001)
                elif (flow == "rtscts"):
                   while (s.getCTS() != True):
                      time.sleep(0.001)
                s.write(c)
                s.flush()
                n += 1
                percent = (100.0*n)/len(data)
                await websocket.send(str(percent))
    #
    # start server
    #
    start_server = websockets.serve(receive,'localhost',port)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()