Skip to content

Commit

Permalink
Add logging to zmq server for wind farm control
Browse files Browse the repository at this point in the history
  • Loading branch information
abhineet-gupta committed Nov 17, 2023
1 parent a398ced commit 85e2f53
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
7 changes: 4 additions & 3 deletions Examples/17b_zeromq_multi_openfast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
DESIRED_YAW_OFFSET = [-10, 10]


def run_zmq():
def run_zmq(logfile):
"""Start the ZeroMQ server for wind farm control"""

# Start the server at the following address
network_address = "tcp://*:5555"
server = wfc_zmq_server(network_address, timeout=60.0, verbose=True)
server = wfc_zmq_server(network_address, timeout=60.0, verbose=True, logfile = logfile)

# Provide the wind farm control algorithm as the wfc_controller method of the server
server.wfc_controller = wfc_controller
Expand Down Expand Up @@ -93,7 +93,8 @@ def sim_openfast_2():
if __name__ == "__main__":
# Start wind farm control server and two openfast simulation
# as separate processes
p0 = mp.Process(target=run_zmq)
logfile = os.path.join(example_out_dir,os.path.splitext(os.path.basename(__file__))[0]+'.log')
p0 = mp.Process(target=run_zmq,args=(logfile,))
p1 = mp.Process(target=sim_openfast_1)
p2 = mp.Process(target=sim_openfast_2)

Expand Down
58 changes: 52 additions & 6 deletions ROSCO_toolbox/control_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@
import numpy as np
import platform, ctypes, os
import zmq
import logging
from ROSCO_toolbox.ofTools.util.FileTools import load_yaml



logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.DEBUG)



# Some useful constants
deg2rad = np.deg2rad(1)
rad2deg = np.rad2deg(1)
Expand Down Expand Up @@ -331,7 +339,7 @@ class wfc_zmq_server:
)
wfc_interface = load_yaml(interface_file)

def __init__(self, network_address="tcp://*:5555", timeout=600.0, verbose=False):
def __init__(self, network_address="tcp://*:5555", timeout=600.0, verbose=False,logfile=[]):
"""Instanciate the server"""
self.network_address = network_address
self.timeout = timeout
Expand All @@ -345,6 +353,19 @@ def __init__(self, network_address="tcp://*:5555", timeout=600.0, verbose=False)
print(
f"Successfully established connection a ZeroMQ server at {network_address}"
)

if logfile is not None:
print(logfile)
logger_filehandler = logging.FileHandler(logfile, "w+")
logger_filehandler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
logger.addHandler(logger_filehandler)
else:
logging.disable()

logger.info(
f"Successfully established connection a ZeroMQ server at {network_address}"
)


def runserver(self):
"""Run the server to get measurements and send setpoints to ROSCO"""
Expand All @@ -363,11 +384,13 @@ def runserver(self):
self.connections._update_measurements(id, measurements)
try:
# Try to get the setpoints of the turbine
logger.debug(f"Trying to get setpoints for id = {id}")
self._get_setpoints(id, measurements)
except NotImplementedError as e:
# Disconnect from the server and raise an error
# if the user has not defined a wind farm controller
self._disconnect()
logger.critical(f'Disconnected due to wfc_controller not being defined by the user')
raise e
else:
# If setpoints are successfully read then
Expand All @@ -376,6 +399,7 @@ def runserver(self):

# Check if there are no clients connected to the server
# and if so, disconnect the server
logger.debug('Checking for disconnect')
connect_zmq = self._check_for_disconnect()

def _get_setpoints(self, id, measurements):
Expand All @@ -384,9 +408,15 @@ def _get_setpoints(self, id, measurements):
Gets the setpoint for the current turbine at the current time step
"""
current_time = self.connections.measurements[id]["Time"]
logger.debug(
f"Asking wfc_controller for setpoints at time = {current_time} for id = {id}"
)
setpoints = self.wfc_controller(id, current_time, measurements)
logger.info(f"Received setpoints {setpoints} from wfc_controller for time = {current_time} and id = {id}")

for s in self.wfc_interface["setpoints"]:
self.connections.setpoints[id][s] = setpoints.get(s, 0)
logger.debug(f'Set setpoint {s} in the connections list to {setpoints.get(s,0)} for id = {id}')

def wfc_controller(self, id, current_time, measurements):
"""User defined wind farm controller
Expand Down Expand Up @@ -417,6 +447,7 @@ def wfc_controller(self, id, current_time, measurements):
>>> # Overwrite the wfc_controller method of the server
>>> server.wfc_controller = wfc_controller
"""
logger.critical("User defined wind farm controller not found")
raise NotImplementedError("Wind farm controller needs to be defined.")

def _get_measurements(self):
Expand All @@ -434,11 +465,16 @@ def _get_measurements(self):
if self.socket in dict(events):
# if poller.poll(timeout_ms):
# Receive measurements over network protocol
logger.debug(
f"Checked for timeout and waiting for measurements from a ROSCO client"
)
message_in = self.socket.recv_string()
logger.debug(f"Received raw message: {message_in} ")
else:
# raise IOError("[%s] Connection to '%s' timed out."
# % (self.identifier, self.network_address))
raise IOError("Connection to timed out.")
logger.info(f"Connection timed out")
raise IOError("Connection timed out")

# Convert to individual strings and then to floats
meas_float = message_in
Expand All @@ -449,6 +485,7 @@ def _get_measurements(self):
meas_dict = {}
for i_meas, meas in enumerate(self.wfc_interface["measurements"]):
meas_dict[meas] = meas_float[i_meas]
logger.info(f"Received message (formatted): {meas_dict}")
if self.verbose:
print("[%s] Measurements received:", meas_dict)

Expand All @@ -463,39 +500,46 @@ def _send_setpoints(self, id):
).encode("utf-8")

# Send reply back to client

logger.debug(f"Raw setpoints to be sent to id = {id} is {message_out}")
if self.verbose:
print("[%s] Sending setpoint string to ROSCO: %s." % (id, message_out))

# Send control setpoints over network protocol
self.socket.send(message_out)
logger.info(f"Sent setpoints {self.connections.setpoints[id]} to id = {id}")

if self.verbose:
print("[%s] Setpoints sent successfully." % id)

def _check_for_disconnect(self):
"""Disconnect if no clients are connected to the server"""
num_connected = sum(self.connections.connected.values())
logger.debug(f'Still connected to {num_connected} clients')
if num_connected > 0:
connect_zmq = True
if self.verbose:
print("Still connected to ", num_connected, " ROSCO clients")
else:
connect_zmq = False
logger.info('Shutting down server as all the clients have dropped off')
self._disconnect()
return connect_zmq

def _disconnect(self):
"""Disconnect from zmq server"""
logger.info('Socket terminated')
self.socket.close()
context = zmq.Context()
context.term()


class wfc_zmq_connections:
"""
This class is used to track the current ROSCO client connections,
This class is used to track the current ROSCO client connections,
their current measurements and setpoints.
"""

# Dictionary of ROSCO clients connected to the server
connected = {}

Expand All @@ -513,6 +557,8 @@ def _add_unique(self, id):
"""
if id not in wfc_zmq_connections.connected.keys():
wfc_zmq_connections.connected.update({id: True})
logger.info(f"Connected to a new ROSCO with id = {id}")

self.setpoints.update(
{id: {s: 0.0 for s in self.wfc_interface["setpoints"]}}
) # init setpoints with zeros
Expand All @@ -521,9 +567,9 @@ def _add_unique(self, id):
) # init measurements with zeros

def _update_measurements(self, id, measurements):
"""Update the measurements and remove turbine from connected clients
"""
"""Update the measurements and remove turbine from connected clients"""
self.measurements.update({id: measurements})
logger.debug(f"Updated measurements for ROSCO with id = {id} ")
if measurements["iStatus"] == -1:
wfc_zmq_connections.connected[id] = False
logger.info(f"Received disconnect signal from ROSCO with id = {id}")

0 comments on commit 85e2f53

Please sign in to comment.