Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Tammo Jan Dijkema
HPIB
Commits
f687613b
Commit
f687613b
authored
May 04, 2018
by
Tammo Jan Dijkema
Browse files
Make thread safe, remove port searching logic for now
parent
edb25e27
Changes
2
Show whitespace changes
Inline
Side-by-side
camrasdevices.py
View file @
f687613b
import
hpib
from
hpib
import
HpibDevice
import
astropy.units
as
u
import
astropy.units
as
u
from
astropy.units
import
Quantity
from
astropy.units
import
Quantity
import
threading
class
CamrasHpibDevice
(
HpibDevice
):
import
hpib
"""Wrapper around HPIB commands"""
import
serial.threaded
_CamrasHpibDevice__port
=
None
serial_port
=
serial
.
Serial
(
"/dev/ttyUSB0"
)
_CamrasHpibDevice__port_lock
=
threading
.
Lock
()
command_thread
=
serial
.
threaded
.
ReaderThread
(
serial_port
,
hpib
.
GPIBProtocol
)
class
CamrasHpibDevice
(
object
):
"""Wrapper around HPIB commands"""
def
__init__
(
self
,
address
):
def
__init__
(
self
,
address
):
if
not
CamrasHpibDevice
.
_CamrasHpibDevice__port
:
if
not
command_thread
.
is_alive
():
with
CamrasHpibDevice
.
_CamrasHpibDevice__port_lock
:
command_thread
.
start
()
_port
=
hpib
.
PrologixGpibUsb
()
self
.
hpib_address
=
address
super
(
CamrasHpibDevice
,
self
).
__init__
(
address
,
_port
)
@
property
@
property
def
frequency
(
self
):
def
frequency
(
self
):
"""Returns the frequency of the device as an astropy Quantity.
"""Returns the frequency of the device as an astropy Quantity.
Throws a RuntimeError if no response"""
Throws a RuntimeError if no response"""
freq_str
=
self
.
query
(
"freq?"
)
freq_str
=
command_thread
.
protocol
.
command
(
"freq?"
,
address
=
self
.
hpib_address
)
if
len
(
freq_str
)
==
0
:
if
len
(
freq_str
)
==
0
:
raise
RuntimeError
(
"Camras device at address {} is not responding"
.
format
(
self
.
_hpib_address
))
raise
RuntimeError
(
"Camras device at address {} is not responding"
.
format
(
self
.
_hpib_address
))
return
int
(
float
(
freq_str
))
*
u
.
Hz
return
int
(
float
(
freq_str
))
*
u
.
Hz
...
@@ -30,7 +29,7 @@ class CamrasHpibDevice(HpibDevice):
...
@@ -30,7 +29,7 @@ class CamrasHpibDevice(HpibDevice):
"""Set the device to the specified frequency. Throws a RuntimeError if failed"""
"""Set the device to the specified frequency. Throws a RuntimeError if failed"""
if
isinstance
(
freq
,
Quantity
):
if
isinstance
(
freq
,
Quantity
):
freq
=
freq
.
to
(
u
.
Hz
).
value
freq
=
freq
.
to
(
u
.
Hz
).
value
self
.
command
(
"freq {:d} Hz"
.
format
(
int
(
freq
)))
command_thread
.
protocol
.
command
(
"freq {:d} Hz"
.
format
(
int
(
freq
))
,
address
=
self
.
hpib_address
)
new_freq
=
self
.
frequency
new_freq
=
self
.
frequency
if
new_freq
.
to
(
u
.
Hz
).
value
!=
int
(
freq
):
if
new_freq
.
to
(
u
.
Hz
).
value
!=
int
(
freq
):
raise
RuntimeError
(
"Setting frequency failed: tried to set to {}, it is now {}"
.
format
(
raise
RuntimeError
(
"Setting frequency failed: tried to set to {}, it is now {}"
.
format
(
...
@@ -39,17 +38,17 @@ class CamrasHpibDevice(HpibDevice):
...
@@ -39,17 +38,17 @@ class CamrasHpibDevice(HpibDevice):
class
Receiver
(
CamrasHpibDevice
):
class
Receiver
(
CamrasHpibDevice
):
"""Wrapper around HPIB commands for the Rohde & Schwartz receiver"""
"""Wrapper around HPIB commands for the Rohde & Schwartz receiver"""
def
__init__
(
self
):
def
__init__
(
self
,
**
kwargs
):
super
(
Receiver
,
self
).
__init__
(
1
)
super
(
Receiver
,
self
).
__init__
(
1
,
**
kwargs
)
class
LocalOscillator
(
CamrasHpibDevice
):
class
LocalOscillator
(
CamrasHpibDevice
):
"""Wrapper around HPIB commands for the local oscillator"""
"""Wrapper around HPIB commands for the local oscillator"""
def
__init__
(
self
):
def
__init__
(
self
,
**
kwargs
):
super
(
LocalOscillator
,
self
).
__init__
(
28
)
super
(
LocalOscillator
,
self
).
__init__
(
28
,
**
kwargs
)
class
ClockGenerator
(
CamrasHpibDevice
):
class
ClockGenerator
(
CamrasHpibDevice
):
"""Wrapper around HPIB commands for the clock generator (should be at 140MHz)"""
"""Wrapper around HPIB commands for the clock generator (should be at 140MHz)"""
def
__init__
(
self
):
def
__init__
(
self
,
**
kwargs
):
super
(
SignalGenerator
,
self
).
__init__
(
14
)
super
(
SignalGenerator
,
self
).
__init__
(
14
,
**
kwargs
)
hpib.py
View file @
f687613b
import
serial
import
serial
import
serial.threaded
import
threading
import
time
import
time
import
re
import
re
import
platform
import
platform
import
sys
import
sys
from
threading
import
Lock
import
datetime
try
:
class
PortDevice
(
object
):
import
queue
def
command
(
self
,
hpib_address
,
command
):
except
ImportError
:
raise
NotImplementedError
import
Queue
as
queue
def
query
(
self
,
hpib_address
,
query
,
count
=
80
):
raise
NotImplementedError
class
GPIBProtocol
(
serial
.
threaded
.
LineReader
):
TERMINATOR
=
b
'
\n
'
def
set_local
(
self
,
hpib_address
):
raise
NotImplementedError
def
__init__
(
self
):
super
(
GPIBProtocol
,
self
).
__init__
()
self
.
responses
=
queue
.
Queue
()
class
PrologixGpibUsb
(
PortDevice
):
self
.
lock
=
threading
.
Lock
()
_serialPort
=
None
_devices
=
"Not scanned."
def
init_hpib
(
self
):
with
self
.
lock
:
def
__init__
(
self
,
port
=
""
,
timeout
=
1
,
port_format
=
"%d"
,
max_device_num
=
9
,
verbose
=
False
,
self
.
write_line
(
"++savecfg 0"
)
find_devices
=
False
):
self
.
write_line
(
"++auto 0"
)
if
port
==
""
:
self
.
write_line
(
"++eoi 1"
)
max_device_num
,
port
,
port_format
=
self
.
_set_default_platform_values
()
self
.
write_line
(
"++eos 2"
)
self
.
write_line
(
"++eot_enable 0"
)
self
.
_validate_port
(
port
)
self
.
write_line
(
"++eot_char 0"
)
self
.
_lock
=
Lock
()
self
.
write_line
(
"++read_tmo_ms 500"
)
if
verbose
:
def
command
(
self
,
command
,
timeout
=
0.2
,
address
=
None
):
print
(
"Looking for ports such as %s in range %d on %s"
%
\
with
self
.
lock
:
(
port
,
max_device_num
,
platform
.
system
()))
if
address
:
self
.
write_line
(
"++addr {:d}"
.
format
(
address
))
self
.
_find_available_port
(
max_device_num
,
port
,
port_format
,
timeout
,
verbose
)
self
.
write_line
(
command
)
if
"?"
in
command
:
self
.
_initialize_prologix_gpi
()
self
.
write_line
(
"++read eoi"
)
# The command was a query, wait for response
if
find_devices
:
line
=
self
.
responses
.
get
(
timeout
=
timeout
)
self
.
_find_devices
()
return
line
def
_set_default_platform_values
(
self
):
if
platform
.
system
()
==
"Linux"
:
port
=
"/dev/ttyUSB0"
port_format
=
"%d"
max_device_num
=
99
elif
platform
.
system
()
==
"Darwin"
:
port
=
"/dev/ttys000"
port_format
=
"%03d"
max_device_num
=
999
else
:
else
:
raise
NotImplementedError
(
"Windows is not supported"
)
pass
return
max_device_num
,
port
,
port_format
def
_validate_port
(
self
,
port
):
self
.
_rematch
=
re
.
search
(
r
"(/dev/tty.*[^0-9])([0-9]+$)"
,
port
)
if
not
self
.
_rematch
:
raise
ValueError
(
port
)
def
_find_available_port
(
self
,
max_device_num
,
port
,
port_format
,
timeout
,
verbose
):
while
int
(
self
.
_rematch
.
group
(
2
))
<
max_device_num
:
if
verbose
:
print
(
"trying "
,
port
)
try
:
self
.
_serialPort
=
serial
.
Serial
(
port
,
timeout
=
timeout
)
break
except
serial
.
SerialException
:
port
=
self
.
_rematch
.
group
(
1
)
+
(
port_format
%
(
int
(
self
.
_rematch
.
group
(
2
))
+
1
))
self
.
_rematch
=
re
.
search
(
"(/dev/tty.*[^0-9])([0-9]+$)"
,
port
)
if
self
.
_serialPort
is
None
:
raise
serial
.
SerialException
(
"Cannot find port"
)
def
_write_serial
(
self
,
commandString
):
if
sys
.
version_info
>=
(
3
,
0
):
self
.
_serialPort
.
write
(
bytes
(
commandString
,
'utf-8'
))
else
:
self
.
_serialPort
.
write
(
commandString
)
def
_initialize_prologix_gpi
(
self
):
with
self
.
_lock
:
self
.
_write_serial
(
"++savecfg 0"
+
chr
(
10
))
self
.
_write_serial
(
"++auto 0"
+
chr
(
10
))
self
.
_write_serial
(
"++eoi 1"
+
chr
(
10
))
self
.
_write_serial
(
"++eos 2"
+
chr
(
10
))
self
.
_write_serial
(
"++eot_enable 0"
+
chr
(
10
))
self
.
_write_serial
(
"++eot_char 0"
+
chr
(
10
))
self
.
_write_serial
(
"++read_tmo_ms 500"
+
chr
(
10
))
def
_find_devices
(
self
):
self
.
_devices
=
[]
for
address
in
range
(
0
,
31
):
commandStr
=
"++addr "
+
str
(
address
)
+
chr
(
10
)
+
\
"*idn?"
+
chr
(
10
)
+
\
"++read"
+
chr
(
10
)
with
self
.
_lock
:
self
.
_write_serial
(
commandStr
)
returnStr
=
self
.
_serialPort
.
readline
()
if
returnStr
!=
""
:
self
.
_devices
+=
[
address
,
returnStr
]
def
command
(
self
,
hpib_address
,
command
):
command
=
"++addr "
+
str
(
hpib_address
)
+
chr
(
10
)
+
command
with
self
.
_lock
:
self
.
_write_serial
(
command
+
chr
(
10
))
def
query
(
self
,
hpib_address
,
query
,
count
=
80
):
query
=
"++addr "
+
str
(
hpib_address
)
+
chr
(
10
)
+
query
query
+=
chr
(
10
)
+
"++read eoi"
+
chr
(
10
)
with
self
.
_lock
:
self
.
_write_serial
(
query
)
line
=
self
.
_serialPort
.
readline
()
return
line
[:
-
1
]
def
set_local
(
self
,
hpib_address
):
# self._commandStr = "++addr "+str(hpib_address)+chr(10)+"loc "+str(hpib_address)+chr(10)
# self._commandCount = self._write_serial(self._commandStr)
return
def
get_devices
(
self
):
return
self
.
_devices
class
GPIB_232_485CT_A
(
PortDevice
):
def
__init__
(
self
,
port
=
"/dev/ttyS0"
,
baudrate
=
9600
,
bytesize
=
serial
.
EIGHTBITS
,
parity
=
serial
.
PARITY_NONE
,
stopbits
=
serial
.
STOPBITS_ONE
,
interCharTimeout
=
None
,
pause
=
0
,
timeout
=
10
):
self
.
_serialPort
=
serial
.
Serial
(
port
,
baudrate
,
bytesize
,
parity
,
stopbits
,
interCharTimeout
,
timeout
)
self
.
_serialPort
.
flushInput
()
self
.
_pause
=
pause
def
command
(
self
,
hpib_address
,
command
):
self
.
_commandStr
=
"wr "
+
str
(
hpib_address
)
+
chr
(
10
)
+
command
+
chr
(
13
)
self
.
_commandCount
=
self
.
_write_serial
(
self
.
_commandStr
)
def
query
(
self
,
hpib_address
,
query
,
count
=
80
):
# flushInput should not be necessary
# self._serialPort.flushInput()
self
.
_commandStr
=
"wr "
+
str
(
hpib_address
)
+
chr
(
10
)
+
query
+
chr
(
13
)
self
.
_commandCount
=
self
.
_write_serial
(
self
.
_commandStr
)
self
.
_queryStr
=
"rd #"
+
str
(
count
)
+
" "
+
str
(
hpib_address
)
+
chr
(
13
)
self
.
_queryCount
=
self
.
_write_serial
(
self
.
_queryStr
)
self
.
_returnStr
=
self
.
_serialPort
.
read
(
count
)
self
.
_returnCnt
=
int
(
self
.
_serialPort
.
readline
())
return
self
.
_returnStr
[:
self
.
_returnCnt
-
1
]
def
set_local
(
self
,
hpib_address
):
self
.
_commandStr
=
"loc "
+
str
(
hpib_address
)
+
chr
(
13
)
self
.
_commandCount
=
self
.
_write_serial
(
self
.
_commandStr
)
class
HpibDevice
(
object
):
def
__init__
(
self
,
hpib_address
,
port
,
keepLocal
=
False
,
pause
=
0
):
self
.
_hpib_address
=
hpib_address
self
.
_port
=
port
self
.
_keepLocal
=
keepLocal
self
.
_pause
=
pause
def
keep_remote
(
self
,
keepRemote
=
False
):
self
.
_keepLocal
=
not
keepRemote
def
set_local
(
self
,
set2Local
=
True
):
if
set2Local
:
self
.
_port
.
set_local
(
self
.
_hpib_address
)
def
command
(
self
,
commandStr
):
self
.
_port
.
command
(
self
.
_hpib_address
,
commandStr
)
self
.
set_local
(
self
.
_keepLocal
)
time
.
sleep
(
self
.
_pause
)
def
query
(
self
,
queryStr
):
answerStr
=
self
.
_port
.
query
(
self
.
_hpib_address
,
queryStr
)
self
.
set_local
(
self
.
_keepLocal
)
return
answerStr
def
get_devices
(
self
):
def
handle_line
(
self
,
line
):
return
[
"Not implemented"
]
self
.
responses
.
put
(
line
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment