Skip to content

Commit

Permalink
test/AxisMr: Clean up moveWait()
Browse files Browse the repository at this point in the history
Use the functionality of the motorRecord:
When writing to the .VAL field (and others) we can wait for a callback.
Use this: Calculate the timeout and wait.

This needs some more changes, e.g. in AxisCom to allow an exeption
not to be thrown, the same feature that p4p already has.
p4p will not throw an exception, if asked to do so, but return it.
We use a different approach: put() returns True or False.
Or throws an exception. Whatever throw=throw asks for.

Changes to be committed:
    modified:   pytests36/140_Record-VAL_VMAX.py
    modified:   pytests36/141_Record_NTM.py
    modified:   pytests36/160-Ethercat-MoveAbs-STOP.py
    modified:   pytests36/AxisCom.py
    modified:   pytests36/AxisMr.py
  • Loading branch information
tboegi committed Nov 12, 2024
1 parent de5eaa4 commit 2530310
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 22 deletions.
4 changes: 2 additions & 2 deletions test/pytests36/140_Record-VAL_VMAX.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ def motorPositionTC(self, tc_no, destination, velocity):
if velocity != self.velo:
self.axisCom.put(".VELO", velocity)

self.axisMr.moveWait(tc_no, destination)
testPassed = self.axisMr.moveWait(tc_no, destination, throw=False)
if velocity != self.velo:
self.axisCom.put(".VELO", self.velo)

UserPosition = self.axisCom.get(".RBV", use_monitor=False)
print(f"{tc_no} postion={UserPosition:f} destination={destination:f}")
testPassed = self.axisMr.postMoveCheck(tc_no)
testPassed = testPassed and self.axisMr.postMoveCheck(tc_no)
if testPassed:
self.axisCom.putDbgStrToLOG("Passed " + str(tc_no), wait=True)
else:
Expand Down
7 changes: 5 additions & 2 deletions test/pytests36/141_Record_NTM.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def moveVALnewRBVnewValNtmRtryDly(
velo = oldVELO
if vmax > 0.0:
self.axisCom.put(".VELO", vmax)
self.axisMr.moveWait(tc_no, startpos)
testPassed = self.axisMr.moveWait(tc_no, startpos, throw=False)
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam}:{lineno()} {tc_no} testPassed={testPassed}"
)

# Max velo: Move the whole range not faster than 4 seconds
hlm = self.axisCom.get(".HLM")
Expand Down Expand Up @@ -100,7 +103,7 @@ def moveVALnewRBVnewValNtmRtryDly(
)
self.axisMr.waitForStop(tc_no, timeout + dly + 1.0)
postMoveCheckOK = self.axisMr.postMoveCheck(tc_no)
testPassed = valueChangedOK and postMoveCheckOK
testPassed = testPassed and valueChangedOK and postMoveCheckOK
print(
f"{tc_no} moveVALnewRBVnewValNtmRtryDly valueChangedOK={valueChangedOK} postMoveCheckOK={postMoveCheckOK}"
)
Expand Down
34 changes: 22 additions & 12 deletions test/pytests36/160-Ethercat-MoveAbs-STOP.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ class Test(unittest.TestCase):
hlm = axisCom.get(".HLM")
llm = axisCom.get(".LLM")
per10_UserPosition = round((9 * llm + 1 * hlm) / 10)

msta = int(axisCom.get(".MSTA"))

print(f"llm={llm:f} hlm={hlm:f}")
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam}:{lineno()} llm={llm:.2f} hlm={hlm:.2f}"
)

# Make sure that motor is homed
def test_TC_1601(self):
Expand All @@ -40,19 +39,30 @@ def test_TC_1601(self):

# per10 UserPosition
def test_TC_1602(self):
if self.msta & self.axisMr.MSTA_BIT_HOMED:
tc_no = "1602"
print(f"{tc_no}")
self.axisMr.moveWait(tc_no, self.per10_UserPosition)
UserPosition = self.axisCom.get(".RBV", use_monitor=False)
tc_no = 1602
self.axisCom.putDbgStrToLOG("Start " + str(tc_no), wait=True)
testPassed = False
msta = int(self.axisCom.get(".MSTA"))
if msta & self.axisMr.MSTA_BIT_HOMED:
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam}:{lineno()} {tc_no} jog_start_pos={self.per10_UserPosition:.2f}"
)
testPassed = self.axisMr.moveWait(
tc_no, self.per10_UserPosition, throw=False
)
rbv = self.axisCom.get(".RBV", use_monitor=False)
print(
"%s postion=%f jog_start_pos=%f"
% (tc_no, UserPosition, self.per10_UserPosition)
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam}:{lineno()} {tc_no} rbv={rbv:.2f} testPassed={testPassed}"
)
if testPassed:
self.axisCom.putDbgStrToLOG("Passed " + str(tc_no), wait=True)
else:
self.axisCom.putDbgStrToLOG("Failed " + str(tc_no), wait=True)
assert testPassed

# stress test; start and stop the quickly..
def test_TC_1603(self):
tc_no = "1603"
tc_no = 1603

msta = int(self.axisCom.get(".MSTA"))
nErrorId = self.axisCom.get("-ErrId")
Expand Down
12 changes: 9 additions & 3 deletions test/pytests36/AxisCom.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,21 @@ def put(self, pvsuf, value, wait=False, timeout=5.0, throw=True):
)
if self.ctxt is not None:
# p4p
ret = self.ctxt.put(pvname, value, timeout=timeout, wait=wait, throw=throw)
if throw:
# When p4p should throw an exception, do that
return self.ctxt.put(
pvname, value, timeout=timeout, wait=wait, throw=True
)
# else
ex = self.ctxt.put(pvname, value, timeout=timeout, wait=wait, throw=False)
if self.log_debug:
if ret is None:
if ex is None:
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam} put {fullname} value={value}"
)
else:
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam} put {fullname} value={value} ret={ret}"
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {filnam} put {fullname} value={value} ex={ex}"
)
if ret is None:
return True
Expand Down
40 changes: 37 additions & 3 deletions test/pytests36/AxisMr.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,16 +691,50 @@ def moveIntoLS(

return passed

def moveWait(self, tc_no, destination, field=".VAL", throw=True):
def moveWait(self, tc_no, destination, throw=True):
start_change_cnt_dmov_true = self.axisCom.get_change_cnts("dmov_true")
start_change_cnt_dmov_false = self.axisCom.get_change_cnts("dmov_false")
timeout = 30
rbv = self.axisCom.get(".RBV")
acceleration = self.axisCom.get(".ACCL")
velocity = self.axisCom.get(".VELO")
timeout += 2 * acceleration + 1.0
if velocity > 0:
distance = math.fabs(self.axisCom.get(".RBV") - destination)
distance = math.fabs(rbv - destination)
timeout += distance / velocity

return self.axisCom.put(field, destination, wait=True, timeout=timeout)
mov1_change_cnt_dmov_true = self.axisCom.get_change_cnts("dmov_true")
mov1_change_cnt_dmov_false = self.axisCom.get_change_cnts("dmov_false")
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {tc_no}:moveWait destination={destination:.2f} rbv={rbv:.2f} mov1_change_cnt_dmov_true={mov1_change_cnt_dmov_true} mov1_change_cnt_dmov_false={mov1_change_cnt_dmov_false}"
)
# The motorRecord will handle even situations like rbv=0.501 and val=5.00
ret = self.axisCom.put(
".VAL", destination, wait=True, timeout=timeout, throw=throw
)
timeToWait = 1.0 # wait max 1 second for the callbacks
while timeToWait > 0.0:
mov1_change_cnt_dmov_true = self.axisCom.get_change_cnts("dmov_true")
mov1_change_cnt_dmov_false = self.axisCom.get_change_cnts("dmov_false")
num_change_cnt_dmov_true = (
mov1_change_cnt_dmov_true - start_change_cnt_dmov_true
)
num_change_cnt_dmov_false = (
mov1_change_cnt_dmov_false - start_change_cnt_dmov_false
)
if num_change_cnt_dmov_true == 1 and num_change_cnt_dmov_false == 1:
timeToWait = 0.0
else:
print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {tc_no}:moveWait num_change_cnt_dmov_true={num_change_cnt_dmov_true} num_change_cnt_dmov_false={num_change_cnt_dmov_false} timeToWait={timeToWait:.2f}"
)
time.sleep(polltime)
timeToWait -= polltime

print(
f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S} {tc_no}:moveWait num_change_cnt_dmov_true={num_change_cnt_dmov_true} num_change_cnt_dmov_false={num_change_cnt_dmov_false} ret={ret}"
)
return ret and num_change_cnt_dmov_true == 1 and num_change_cnt_dmov_false == 1

def postMoveCheck(self, tc_no):
# Check the motor for the correct state at the end of move.
Expand Down

0 comments on commit 2530310

Please sign in to comment.