"""HPIB wrappers for devices in the Dwingeloo telescope This module contains some classes for conveniently working with the devices in the Dwingeloo Telescope that communicate over GPIB (General Purpuse Interface Bus) or HPIB (presumably the HP variant of GPIB). Currently, three of these devices exist, on different HPIB addresses: - Receiver (address 1) - Local Oscillator (address 28), used for mixing signals to downconvert them - Clock Generator (address 14), used as the clock for the FPGA For each of the three devices a class exists. Setting and getting the frequency of these devices is done by using their `frequency` attribute. Example: >>> import camrasdevices >>> lo = LocalOscillator() >>> lo.frequency >>> lo.frequency = 1.1e9 >>> lo.frequency By default, the setter method of `frequency` checks if setting the device succeeded. This is a blocking operation, but the HPIB bus is quite fast. >>> lo.frequency = 1.e18 Traceback (most recent call last): RuntimeError: Setting frequency failed: tried to set to 1e+18 Hz, it is now 1100000000.0 Hz This checking can be disabled: >>> non_checking_lo = camrasdevices.LocalOscillator(check_frequency = False) >>> non_checking_lo.frequency = 1.e18 >>> non_checking_lo.frequency Other HPIB commands can be sent using the `command` function: >>> lo.command("loc 28") # Set device to local mode Also generic HPIB commands (without specific address) can be sent: >>> lo.command("++eoi 1", prepend_address=False) The CamrasHpibDevice class spawns one thread (shared with all instances) to perform the HPIB commands. This is to make sure that instances living in separate threads do not interfere with each other. The thread is a daemon, so it will be killed on program exit. On initialization of the first CamrasDevice (or subclass), the command thread is initialized, and some initialization commands are sent to the HPIB bus to tell it to use the appropriate line endings etc. """ import astropy.units as u from astropy.units import Quantity import hpib import serial.threaded import threading class CamrasHpibDevice(object): """Wrapper around HPIB commands Args: address (int): GPIB address check_frequency (bool): check the frequency after setting it, raise an error if setting the frequency failed return_to_local (bool): return the device to local mode after each HPIB command. Some devices switch out of manual mode when a HPIB command is set; undo this. Attributes: hpib_address (int): GPIB address check_frequency (bool): check the frequency after setting it, raise an error if setting the frequency failed return_to_local (bool): return the device to local mode after each HPIB command. Some devices switch out of manual mode when a HPIB command is set; undo this. """ command_thread = None _initialization_lock = threading.Lock() def __init__(self, address, check_frequency=True, return_to_local=False): self.hpib_address = address self._check_frequency = check_frequency self.return_to_local = return_to_local with self._initialization_lock: if not CamrasHpibDevice.command_thread: serial_port = serial.Serial("/dev/ttyUSB0") CamrasHpibDevice.command_thread = serial.threaded.ReaderThread(serial_port, hpib.GPIBProtocol) CamrasHpibDevice.command_thread.start() CamrasHpibDevice.command_thread._connection_made.wait() CamrasHpibDevice.command_thread.protocol.init_hpib() def command(self, command_string, prepend_address=True, return_to_local=False): """Send a command to the HPIB command thread Args: command_string (str): HPIB command prepend_address (bool): first send the address of this device return_to_local (bool): send a 'return to local' command after the command Returns: str: output of the command (in case it was a query) """ if prepend_address: address = self.hpib_address else: address = None return CamrasHpibDevice.command_thread.protocol.command(command_string, address=address, return_to_local=self.return_to_local) @property def frequency(self): """Frequency of the device as an astropy Quantity. Returns: Quantity: frequency (in Hz) of the device Raises: RuntimeError: If the device does not respond """ freq_str = self.command("freq?") if not freq_str or len(freq_str) == 0: raise RuntimeError("Camras device at address {} is not responding".format( self.hpib_address)) return int(float(freq_str)) * u.Hz @frequency.setter def frequency(self, freq): """Set the device to the specified frequency. Throws a RuntimeError if failed Args: freq (Union[float, Quantity]): new frequency. If no unit present, assume Hz Raises: RuntimeError: On communication error ValueError: If the device sticks to a different frequency (e.g. if the specified frequency is outside the accepted range for the device) """ if isinstance(freq, Quantity): freq = freq.to(u.Hz).value self.command("freq {:d} Hz".format(int(freq))) new_freq = self.frequency if self._check_frequency and new_freq.to(u.Hz).value != int(freq): raise ValueError("Setting frequency failed: tried to set to {}, it is now {}".format( freq, new_freq)) class Receiver(CamrasHpibDevice): """Wrapper around HPIB commands for the Rohde & Schwartz receiver""" def __init__(self, **kwargs): super(Receiver, self).__init__(1) class LocalOscillator(CamrasHpibDevice): """Wrapper around HPIB commands for the local oscillator""" def __init__(self, **kwargs): super(LocalOscillator, self).__init__(28) class ClockGenerator(CamrasHpibDevice): """Wrapper around HPIB commands for the clock generator (should be at 140MHz)""" def __init__(self, **kwargs): super(ClockGenerator, self).__init__(14)