import serial import time import re import platform import sys from threading import Lock class PortDevice(object): def command(self, hpib_address, command): raise NotImplementedError def query(self, hpib_address, query, count=80): raise NotImplementedError def set_local(self, hpib_address): raise NotImplementedError class PrologixGpibUsb(PortDevice): _serialPort = None _devices = "Not scanned." def __init__(self, port="", timeout=1, port_format="%d", max_device_num=9, verbose=False, find_devices=False): if port == "": max_device_num, port, port_format = self._set_default_platform_values() self._validate_port(port) self._lock = Lock() if verbose: print("Looking for ports such as %s in range %d on %s" % \ (port, max_device_num, platform.system())) self._find_available_port(max_device_num, port, port_format, timeout, verbose) self._initialize_prologix_gpi() if find_devices: self._find_devices() def _set_default_platform_values(self): if platform.system() == "Linux": port = "/dev/ttyUSB0" port_format = "%d" max_device_num = 99 elif platform.system() == "Darwin": port = "/dev/ttys000" port_format = "%03d" max_device_num = 999 else: raise NotImplementedError("Windows is not supported") return max_device_num, port, port_format def _validate_port(self, port): self._rematch = re.search(r"(/dev/tty.*[^0-9])([0-9]+$)", port) if not self._rematch: raise ValueError(port) def _find_available_port(self, max_device_num, port, port_format, timeout, verbose): while int(self._rematch.group(2)) < max_device_num: if verbose: print("trying ", port) try: self._serialPort = serial.Serial(port, timeout=timeout) break except serial.SerialException: port = self._rematch.group(1) + (port_format % (int(self._rematch.group(2)) + 1)) self._rematch = re.search("(/dev/tty.*[^0-9])([0-9]+$)", port) if self._serialPort is None: raise serial.SerialException("Cannot find port") def _write_serial(self, commandString): if sys.version_info >= (3,0): self._serialPort.write(bytes(commandString, 'utf-8')) else: self._serialPort.write(commandString) def _initialize_prologix_gpi(self): self._write_serial("++savecfg 0" + chr(10)) self._write_serial("++auto 0" + chr(10)) self._write_serial("++eoi 1" + chr(10)) self._write_serial("++eos 2" + chr(10)) self._write_serial("++eot_enable 0" + chr(10)) self._write_serial("++eot_char 0" + chr(10)) self._write_serial("++read_tmo_ms 500" + chr(10)) def _find_devices(self): self._devices = [] for address in range(0, 31): commandStr = "++addr " + str(address) + chr(10) + \ "*idn?" + chr(10) + \ "++read" + chr(10) self._write_serial(commandStr) returnStr = self._serialPort.readline() if returnStr != "": self._devices += [address, returnStr] def command(self, hpib_address, command): command = "++addr " + str(hpib_address) + chr(10) + command with self._lock: self._write_serial(command + chr(10)) def query(self, hpib_address, query, count=80): query = "++addr " + str(hpib_address) + chr(10) + query query += chr(10) + "++read eoi" + chr(10) with self._lock: self._write_serial(query) line = self._serialPort.readline() return line[:-1] def set_local(self, hpib_address): # self._commandStr = "++addr "+str(hpib_address)+chr(10)+"loc "+str(hpib_address)+chr(10) # self._commandCount = self._write_serial(self._commandStr) return def get_devices(self): return self._devices class GPIB_232_485CT_A(PortDevice): def __init__(self, port="/dev/ttyS0", baudrate=9600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, interCharTimeout=None, pause=0, timeout=10): self._serialPort = serial.Serial(port, baudrate, bytesize, parity, stopbits, interCharTimeout, timeout) self._serialPort.flushInput() self._pause = pause def command(self, hpib_address, command): self._commandStr = "wr " + str(hpib_address) + chr(10) + command + chr(13) self._commandCount = self._write_serial(self._commandStr) def query(self, hpib_address, query, count=80): # flushInput should not be necessary # self._serialPort.flushInput() self._commandStr = "wr " + str(hpib_address) + chr(10) + query + chr(13) self._commandCount = self._write_serial(self._commandStr) self._queryStr = "rd #"+str(count)+" "+str(hpib_address)+chr(13) self._queryCount = self._write_serial(self._queryStr) self._returnStr = self._serialPort.read(count) self._returnCnt = int(self._serialPort.readline()) return self._returnStr[:self._returnCnt-1] def set_local(self, hpib_address): self._commandStr = "loc "+str(hpib_address)+chr(13) self._commandCount = self._write_serial(self._commandStr) class HpibDevice(object): def __init__(self, hpib_address, port, keepLocal=False,pause=0): self._hpib_address = hpib_address self._port = port self._keepLocal = keepLocal self._pause = pause def keep_remote(self, keepRemote=False): self._keepLocal = not keepRemote def set_local(self, set2Local=True): if set2Local: self._port.set_local(self._hpib_address) def command(self, commandStr): self._port.command(self._hpib_address, commandStr) self.set_local(self._keepLocal) time.sleep(self._pause) def query(self, queryStr): self._answerStr = self._port.query(self._hpib_address, queryStr) self.set_local(self._keepLocal) return self._answerStr def get_devices(self): return ["Not implemented"]