from typing import Any, Union import astropy.units as u from astropy.coordinates import EarthLocation from astropy.time import Time import threading import subprocess from vlsr import doppler_frequency from camrasdevices import LocalOscillator freq_hi = 1420.405751 * u.MHz def doppler_harm(sky_coordinate, time, tracking_frequency, location, variant): ra = sky_coordinate.ra.to(u.rad).value dec = sky_coordinate.dec.to(u.rad).value t = time.unix dt_lat = location.lat.to(u.rad).value dt_lon = location.lon.to(u.rad).value dt_alt = location.height.to(u.m).value argstring = "{ra} {dec} 2000 {time} {dt_lat} {dt_lon} {dt_alt}".format( ra=ra, dec=dec, time=t, dt_lat=dt_lat, dt_lon=dt_lon, dt_alt=dt_alt) doppler_executable = "/home/harm/bin/doppler_" + variant doppler_cmd = doppler_executable + " " + argstring \ + " " + str(tracking_frequency.to(u.MHz).value) freq_doppler = float(subprocess.Popen(doppler_cmd, stdout=subprocess.PIPE, shell=True ).stdout.read()) return freq_doppler * u.MHz def doppler_harm_bl(sky_coordinate, time, tracking_frequency, location): return doppler_harm(sky_coordinate, time, tracking_frequency, location, 'bl') def doppler_harm_mb(sky_coordinate, time, tracking_frequency, location): return doppler_harm(sky_coordinate, time, tracking_frequency, location, 'mb') def track_doppler(dt, lo, tracking_frequency=freq_hi, doppler_function=doppler_frequency, exit_event=None, timeout=0.3): """Sets the Local Oscillator to a frequency to correct for Doppler shift. When exit event is set, return the LO frequency to 1GHz. Args: dt (Telescope): a Telescope instance to read the current pointing from. lo (LocalOscillator): the local oscillator of which the frequency will be set. doppler_function (function): a function that computes the doppler frequency. This function gets a SkyCoord and a time (float), and should return a frequency Quantity timeout (float): timeout between updates of the Local Oscillator """ if not exit_event: exit_event = threading.Event() dt_lat = 52.812019 * u.deg dt_lon = 6.396169 * u.deg dt_alt = 25.0 * u.m dt_loc = EarthLocation.from_geodetic(lat=dt_lat, lon=dt_lon, height=dt_alt) while not exit_event.is_set(): sky_coordinate = dt.radec if sky_coordinate: freq_doppler = doppler_function(sky_coordinate, Time.now(), tracking_frequency, dt_loc) dfreq = 1 * u.GHz + tracking_frequency - freq_doppler lo.frequency = dfreq exit_event.wait(timeout=timeout) lo.frequency = 1 * u.GHz exit_event.clear()