Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align reef commit to ds #961

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Globals
VERSION="1.2.14"
VERSION="1.2.16"
CEPH_VERSION="18.2.2"
SPDK_VERSION="24.01"
CONTAINER_REGISTRY="quay.io/ceph"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ jobs:
strategy:
fail-fast: false
matrix:
test: ["sanity", "state_transitions", "state_transitions_both_gws", "state_transitions_loop", "state_transitions_rand_loop", "late_registration", "late_registration_loop", "4gws", "4gws_loop", "namespaces", "namespaces_loop"]
test: ["sanity", "state_transitions", "state_transitions_both_gws", "state_transitions_loop", "state_transitions_rand_loop", "late_registration", "late_registration_loop", "4gws", "4gws_loop", "namespaces", "namespaces_loop", "mtls"]
runs-on: ubuntu-latest
env:
HUGEPAGES: 1024 # 4 spdk instances
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ echo $NVMEOF_VERSION...

For testing purposes, self signed certificates and keys can be generated locally using OpenSSL.

For the server, generate credentials for server name 'my.server' in files called server.key and server.crt:
For the server, generate credentials for the server named 'my.server' and save them in files called server.key and server.crt. Additionally, specify subject alternative names using the gateway group nodes' IPs in the openssl command.

```bash
$ openssl req -x509 -newkey rsa:4096 -nodes -keyout server.key -out server.crt -days 3650 -subj '/CN=my.server'
$ openssl req -x509 -newkey rsa:4096 -nodes -keyout server.key -out server.crt -days 3650 -subj '/CN=my.server' -addext "subjectAltName=IP:192.168.13.3,IP:192.168.13.4,IP:192.168.13.5,IP:192.168.13.6"
```

For client:
Expand Down
56 changes: 46 additions & 10 deletions control/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class DiscoveryLogEntry(AutoSerializableStructure):
class DiscoveryService:
"""Implements discovery controller.

Response discover request from initiator.
Response discover request from initiator, this must be called from within a "with" block.

Instance attributes:
version: Discovery controller version
Expand Down Expand Up @@ -329,10 +329,49 @@ def __init__(self, config):
assert 0
self.logger.info(f"discovery addr: {self.discovery_addr} port: {self.discovery_port}")

self.sock = None
self.conn_vals = {}
self.connection_counter = 1
self.selector = selectors.DefaultSelector()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
if self.omap_state:
self.omap_state.cleanup_omap()
self.omap_state = None

if self.selector:
with self.lock:
for key in self.conn_vals:
try:
self.selector.unregister(self.conn_vals[key].connection)
except Except as ex:
pass
try:
self.conn_vals[key].connection.close()
except Except as ex:
pass
self.conn_vals = {}

if self.sock:
try:
self.selector.unregister(self.sock)
except Exception as ex:
pass
try:
self.sock.close()
except Exception as ex:
pass
self.sock = None

try:
self.selector.close()
except Exception as ex:
pass
self.selector = None

def _read_all(self) -> Dict[str, str]:
"""Reads OMAP and returns dict of all keys and values."""

Expand Down Expand Up @@ -1068,11 +1107,11 @@ def update_log_level(self):
def start_service(self):
"""Enable listening on the server side."""

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((self.discovery_addr, int(self.discovery_port)))
sock.listen(MAX_CONNECTION)
sock.setblocking(False)
self.selector.register(sock, selectors.EVENT_READ, self.nvmeof_accept)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((self.discovery_addr, int(self.discovery_port)))
self.sock.listen(MAX_CONNECTION)
self.sock.setblocking(False)
self.selector.register(self.sock, selectors.EVENT_READ, self.nvmeof_accept)
self.logger.debug("waiting for connection...")
t = threading.Thread(target=self.handle_timeout)
t.start()
Expand All @@ -1090,10 +1129,7 @@ def start_service(self):
callback = key.data
callback(key.fileobj, mask)
except KeyboardInterrupt:
for key in self.conn_vals:
self.conn_vals[key].connection.close()
self.selector.close()
self.logger.debug("received a ctrl+C interrupt. exiting...")
self.logger.debug("received a ctrl+C interrupt. exiting...")

def main(args=None):
parser = argparse.ArgumentParser(prog="python3 -m control",
Expand Down
37 changes: 21 additions & 16 deletions control/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,8 @@ def set_group_id(self, id: int):

def _wait_for_group_id(self):
"""Waits for the monitor notification of this gatway's group id"""
# Python 3.8: Default value of max_workers is min(32, os.cpu_count() + 4).
# This default value preserves at least 5 workers for I/O bound tasks. It utilizes at
# most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using
# very large resources implicitly on many-core machines.
self.monitor_server = grpc.server(futures.ThreadPoolExecutor())
self.monitor_server = self._grpc_server(self._monitor_address())
monitor_pb2_grpc.add_MonitorGroupServicer_to_server(MonitorGroupService(self.set_group_id), self.monitor_server)
self.monitor_server.add_insecure_port(self._monitor_address())
self.monitor_server.start()
self.logger.info(f"MonitorGroup server is listening on {self._monitor_address()} for group id")
self.monitor_event.wait()
Expand Down Expand Up @@ -182,12 +177,9 @@ def serve(self):
gateway_state = GatewayStateHandler(self.config, local_state, omap_state, self.gateway_rpc_caller, f"gateway-{self.name}")
omap_lock = OmapLock(omap_state, gateway_state, self.rpc_lock)
self.gateway_rpc = GatewayService(self.config, gateway_state, self.rpc_lock, omap_lock, self.group_id, self.spdk_rpc_client, self.ceph_utils)
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
self.server = self._grpc_server(self._gateway_address())
pb2_grpc.add_GatewayServicer_to_server(self.gateway_rpc, self.server)

# Add listener port
self._add_server_listener()

# Check for existing NVMeoF target state
gateway_state.start_update()

Expand Down Expand Up @@ -237,6 +229,11 @@ def _start_monitor_client(self):
'-c', '/etc/ceph/ceph.conf',
'-n', rados_id,
'-k', '/etc/ceph/keyring']
if self.config.getboolean("gateway", "enable_auth"):
cmd += [
"--server-cert", self.config.get("mtls", "server_cert"),
"--client-key", self.config.get("mtls", "client_key"),
"--client-cert", self.config.get("mtls", "client_cert") ]
self.logger.info(f"Starting {' '.join(cmd)}")
try:
# start monitor client process
Expand Down Expand Up @@ -287,8 +284,14 @@ def _monitor_address(self):
monitor_addr = GatewayUtils.escape_address_if_ipv6(monitor_addr)
return "{}:{}".format(monitor_addr, monitor_port)

def _add_server_listener(self):
"""Adds listener port to server."""
def _grpc_server(self, address):
"""Construct grpc server"""

# Python 3.8: Default value of max_workers is min(32, os.cpu_count() + 4).
# This default value preserves at least 5 workers for I/O bound tasks. It utilizes at
# most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using
# very large resources implicitly on many-core machines.
server = grpc.server(futures.ThreadPoolExecutor())

enable_auth = self.config.getboolean("gateway", "enable_auth")
if enable_auth:
Expand All @@ -315,11 +318,13 @@ def _add_server_listener(self):
)

# Add secure port using credentials
self.server.add_secure_port(
self._gateway_address(), server_credentials)
server.add_secure_port(
address, server_credentials)
else:
# Authentication is not enabled
self.server.add_insecure_port(self._gateway_address())
server.add_insecure_port(address)

return server

def _get_spdk_rpc_socket_path(self, omap_state) -> str:
# For backward compatibility, try first to get the old attribute
Expand Down Expand Up @@ -461,7 +466,7 @@ def _stop_discovery(self):
try:
os.kill(self.discovery_pid, signal.SIGINT)
os.waitpid(self.discovery_pid, 0)
except ChildProcessError:
except (ChildProcessError, ProcessLookupError):
pass # ignore
self.logger.info("Discovery service terminated")

Expand Down
16 changes: 16 additions & 0 deletions control/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,22 @@ def _watcher_callback(notify_id, notifier_id, watch_id, data):
else:
self.logger.info(f"Watch already exists.")

def cleanup_omap(self):
self.logger.info(f"Cleanup OMAP on exit ({self.id_text})")
if self.watch:
try:
self.watch.close()
self.logger.debug(f"Unregistered watch ({self.id_text})")
self.watch = None
except Exception:
pass
if self.ioctx:
try:
self.ioctx.close()
self.logger.debug(f"Closed Rados connection ({self.id_text})")
self.ioctx = None
except Exception:
pass

class GatewayStateHandler:
"""Maintains consistency in NVMeoF target state store instances.
Expand Down
6 changes: 6 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ services:
sh -c './vstart.sh --new $$CEPH_VSTART_ARGS &&
ceph osd pool create rbd &&
echo ceph dashboard nvmeof-gateway-add -i <(echo nvmeof-devel:5500) nvmeof.1 &&
pushd /etc/ceph &&
openssl req -x509 -newkey rsa:4096 -nodes -keyout server.key -out server.crt -days 3650 -subj /CN=my.server -addext "subjectAltName = IP:192.168.13.3, IP:0.0.0.0" &&
openssl req -x509 -newkey rsa:4096 -nodes -keyout client.key -out client.crt -days 3650 -subj /CN=client1 &&
popd &&
sleep infinity'
healthcheck:
test: ceph osd pool stats rbd
Expand Down Expand Up @@ -240,6 +244,8 @@ services:
HUGEPAGES_DIR:
labels:
io.ceph.nvmeof:
volumes:
- ceph-conf:/etc/ceph
volumes:
ceph-conf:
networks:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "pdm.backend"

[project]
name = "ceph-nvmeof"
version = "1.2.14"
version = "1.2.16"
description = "Service to provide Ceph storage over NVMe-oF protocol"
readme = "README.md"
requires-python = "~=3.9"
Expand Down
78 changes: 78 additions & 0 deletions tests/ceph-nvmeof.tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2021 International Business Machines
# All rights reserved.
#
# SPDX-License-Identifier: LGPL-3.0-or-later
#
# Authors: [email protected], [email protected]
#

[gateway]
name =
group =
addr = 192.168.13.3
port = 5500
enable_auth = True
state_update_notify = True
state_update_interval_sec = 5
enable_spdk_discovery_controller = False
#omap_file_lock_duration = 20
#omap_file_lock_retries = 30
#omap_file_lock_retry_sleep_interval = 1.0
#omap_file_update_reloads = 10
#enable_prometheus_exporter = True
#prometheus_exporter_ssl = True
#prometheus_port = 10008
#prometheus_bdev_pools = rbd
#prometheus_stats_interval = 10
#verify_nqns = True
#allowed_consecutive_spdk_ping_failures = 1
#spdk_ping_interval_in_seconds = 2.0
#ping_spdk_under_lock = False

[gateway-logs]
log_level=debug
#log_files_enabled = True
#log_files_rotation_enabled = True
#verbose_log_messages = True
#max_log_file_size_in_mb=10
#max_log_files_count=20
#max_log_directory_backups=10
#
# Notice that if you change the log directory the log files will only be visible inside the container
#
#log_directory = /var/log/ceph/

[discovery]
addr = 0.0.0.0
port = 8009

[ceph]
pool = rbd
config_file = /etc/ceph/ceph.conf

[mtls]
server_key = /etc/ceph/server.key
client_key = /etc/ceph/client.key
server_cert = /etc/ceph/server.crt
client_cert = /etc/ceph/client.crt

[spdk]
bdevs_per_cluster = 32
tgt_path = /usr/local/bin/nvmf_tgt
#rpc_socket_dir = /var/tmp/
#rpc_socket_name = spdk.sock
#tgt_cmd_extra_args = --env-context="--no-huge -m1024" --iova-mode=va
timeout = 60.0
log_level = WARNING

# Example value: -m 0x3 -L all
# tgt_cmd_extra_args =

# transports = tcp

# Example value: {"max_queue_depth" : 16, "max_io_size" : 4194304, "io_unit_size" : 1048576, "zcopy" : false}
transport_tcp_options = {"in_capsule_data_size" : 8192, "max_io_qpairs_per_ctrlr" : 7}

[monitor]
#timeout = 1.0
15 changes: 14 additions & 1 deletion tests/ha/4gws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,20 @@ expect_optimized() {
EXPECTED_OPTIMIZED=$2
NQN=$3

socket=$(docker exec "$GW_NAME" find /var/run/ceph -name spdk.sock)
socket_retries=0
socket=""
while [ $socket_retries -lt 10 ] ; do
socket=$(docker exec "$GW_NAME" find /var/run/ceph -name spdk.sock)
if [ -n "$socket" ]; then
break
fi
socket_retries=$(expr $socket_retries + 1)
sleep 1
done
if [ -z "$socket" ]; then
exit 1 # failed
fi

# Verify expected number of "optimized"
for i in $(seq 50); do
response=$(docker exec "$GW_NAME" "$rpc" "-s" "$socket" "$cmd" "$NQN")
Expand Down
1 change: 1 addition & 0 deletions tests/ha/mtls.sh
11 changes: 4 additions & 7 deletions tests/ha/sanity.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ set -xe
# See
# - https://github.com/spdk/spdk/blob/master/doc/jsonrpc.md
# - https://spdk.io/doc/nvmf_multipath_howto.html
. .env
container_ip() {
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$1"
}

GW1_NAME=$(docker ps --format '{{.ID}}\t{{.Names}}' | awk '$2 ~ /nvmeof/ && $2 ~ /1/ {print $1}')
ip="$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$GW1_NAME")"
echo -n "ℹ️ Starting bdevperf container"
docker-compose up -d bdevperf
sleep 10
echo "ℹ️ bdevperf start up logs"
make logs SVC=bdevperf
eval $(make run SVC=bdevperf OPTS="--entrypoint=env" | grep BDEVPERF_SOCKET | tr -d '\n\r' )
BDEVPERF_SOCKET=/tmp/bdevperf.sock
NVMEOF_DISC_PORT=8009


ip=$(container_ip $GW1)
echo "ℹ️ Using discovery service in gateway $GW1 ip $ip"
rpc="/usr/libexec/spdk/scripts/rpc.py"
echo "ℹ️ bdevperf bdev_nvme_set_options"
Expand Down
13 changes: 13 additions & 0 deletions tests/ha/setup_mtls.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
set -xe

GW1_NAME=$(docker ps --format '{{.ID}}\t{{.Names}}' | awk '$2 ~ /nvmeof/ && $2 ~ /1/ {print $1}')
GW1_IP="$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$GW1_NAME")"
NQN="nqn.2016-06.io.spdk:cnode1"

docker-compose run --rm nvmeof-cli --server-address $GW1_IP --server-port 5500 --server-cert /etc/ceph/server.crt --client-key /etc/ceph/client.key --client-cert /etc/ceph/client.crt subsystem add --subsystem $NQN
docker-compose run --rm nvmeof-cli --server-address $GW1_IP --server-port 5500 --server-cert /etc/ceph/server.crt --client-key /etc/ceph/client.key --client-cert /etc/ceph/client.crt namespace add --subsystem $NQN --rbd-pool rbd --rbd-image demo_image1 --size 10M --rbd-create-image -l 1
#docker-compose run --rm nvmeof-cli --server-address $GW1_IP --server-port 5500 --server-cert /etc/ceph/server.crt --client-key /etc/ceph/client.key --client-cert /etc/ceph/client.crt namespace add --subsystem $NQN --rbd-pool rbd --rbd-image demo_image2 --size 10M --rbd-create-image -l 2
docker-compose run --rm nvmeof-cli --server-address $GW1_IP --server-port 5500 --server-cert /etc/ceph/server.crt --client-key /etc/ceph/client.key --client-cert /etc/ceph/client.crt listener add --subsystem $NQN --host-name $GW1_NAME --traddr $GW1_IP --trsvcid 4420
docker-compose run --rm nvmeof-cli --server-address $GW1_IP --server-port 5500 --server-cert /etc/ceph/server.crt --client-key /etc/ceph/client.key --client-cert /etc/ceph/client.crt host add --subsystem $NQN --host "*"
docker-compose run --rm nvmeof-cli --server-address $GW1_IP --server-port 5500 --server-cert /etc/ceph/server.crt --client-key /etc/ceph/client.key --client-cert /etc/ceph/client.crt get_subsystems

Loading
Loading