Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
fix(process): Deadlock removed.
Browse files Browse the repository at this point in the history
* enh(test): new tests to reproduce a deadlock
* cleanup(cmake.sh): cleanup in cmake.sh
* cleanup(process): namespaces not needed in several places.

REFS: MON-10980
  • Loading branch information
bouda1 committed Oct 7, 2021
1 parent da05536 commit 27d20ca
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 28 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
The logs mutex needs to be recursive, because sometimes we create logs, that log
informations.

*Deadlock removed*

This one is in the process class. When the process is stopping and we write to
its stdin, we could arrive to a deadlock.

## 20.10.3

### Bug fixes
Expand Down
33 changes: 16 additions & 17 deletions cmake.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ This program build Centreon-clib
EOF
}
force=0
BUILD_TYPE="Debug"
for i in "$@"
do
BUILD_TYPE=Debug
for i in "$@" ; do
case $i in
-f|--force)
force=1
Expand All @@ -37,7 +36,7 @@ done
my_id=$(id -u)

if [ -r /etc/centos-release ] ; then
maj="centos$(cat /etc/centos-release | awk '{print $4}' | cut -f1 -d'.')"
maj="centos$(awk '{print $4}' /etc/centos-release | cut -f1 -d'.')"
v=$(cmake --version)
if [[ $v =~ "version 3" ]] ; then
cmake='cmake'
Expand All @@ -62,22 +61,22 @@ if [ -r /etc/centos-release ] ; then
ninja-build
)
for i in "${pkgs[@]}"; do
if ! rpm -q $i ; then
if [ $maj = 'centos7' ] ; then
if ! rpm -q "$i" ; then
if [ "$maj" = 'centos7' ] ; then
yum install -y $i
else
dnf -y --enablerepo=PowerTools install $i
fi
fi
done
elif [ -r /etc/issue ] ; then
maj=$(cat /etc/issue | awk '{print $1}')
version=$(cat /etc/issue | awk '{print $3}')
maj=$(awk '{print $1}' /etc/issue)
version=$(awk '{print $3}' /etc/issue)
v=$(cmake --version)
if [[ $v =~ "version 3" ]] ; then
if [[ "$v" =~ "version 3" ]] ; then
cmake='cmake'
elif [ $maj = "Debian" ] ; then
if [ $version = "9" ] ; then
elif [ "$maj" = "Debian" ] ; then
if [ "$version" = "9" ] ; then
dpkg="dpkg"
else
dpkg="dpkg --no-pager"
Expand Down Expand Up @@ -110,8 +109,8 @@ elif [ -r /etc/issue ] ; then
fi
fi
done
elif [ $maj = "Raspbian" ] ; then
if [ $version = "9" ] ; then
elif [ "$maj" = "Raspbian" ] ; then
if [ "$version" = "9" ] ; then
dpkg="dpkg"
else
dpkg="dpkg --no-pager"
Expand All @@ -120,7 +119,7 @@ elif [ -r /etc/issue ] ; then
echo "Bad version of cmake..."
exit 1
else
if [ $my_id -eq 0 ] ; then
if [ "$my_id" -eq 0 ] ; then
apt install -y cmake
cmake='cmake'
else
Expand All @@ -136,7 +135,7 @@ elif [ -r /etc/issue ] ; then
)
for i in "${pkgs[@]}"; do
if ! $dpkg -l --no-pager $i | grep "^ii" ; then
if [ $my_id -eq 0 ] ; then
if [ "$my_id" -eq 0 ] ; then
apt install -y $i
else
echo -e "The package \"$i\" is not installed, you can install it, as root, with the command:\n\tapt install -y $i\n\n"
Expand All @@ -159,9 +158,9 @@ if [ "$force" = "1" ] ; then
mkdir build
fi
cd build
if [ $maj = "Raspbian" ] ; then
if [ "$maj" = "Raspbian" ] ; then
CXXFLAGS="-Wall -Wextra" $cmake -DWITH_PREFIX=/usr -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DWITH_PREFIX_LIB=/usr/lib -DWITH_TESTING=On -DUSE_CXX11_ABI=1 $* ..
elif [ $maj = "Debian" ] ; then
elif [ "$maj" = "Debian" ] ; then
CXXFLAGS="-Wall -Wextra" $cmake -DWITH_PREFIX=/usr -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DWITH_PREFIX_LIB=/usr/lib64 -DWITH_TESTING=On -DUSE_CXX11_ABI=1 $* ..
else
CXXFLAGS="-Wall -Wextra" $cmake -DWITH_PREFIX=/usr -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DWITH_PREFIX_LIB=/usr/lib64 -DWITH_TESTING=On -DUSE_CXX11_ABI=0 $* ..
Expand Down
13 changes: 13 additions & 0 deletions inc/com/centreon/process.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,20 @@ class process {
enum stream { in = 0, out = 1, err = 2 };

private:
/* Error buffer:
* * cleared by exec(),
* * content get and then cleared by read_err()
* * and filled by do_read() when an error in the process occures. This last
* method is called by the process manager in its main loop when needed.
*/
std::string _buffer_err;

/* Output buffer:
* * cleared by exec(),
* * content get and then cleared by read()
* * and filled by do_read() when an output in the process occures. This last
* method is called by the process manager in its main loop when needed.
*/
std::string _buffer_out;
pid_t (*_create_process)(char**, char**);
mutable std::condition_variable _cv_buffer_err;
Expand Down
30 changes: 19 additions & 11 deletions src/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ void process::update_ending_process(int status) {
_end_time = timestamp::now();
_status = status;
_process = static_cast<pid_t>(-1);
_close(_stream[process::in]);
_close(_stream[in]);
if (!_is_running()) {
// Notify listener if necessary.
if (_listener) {
Expand Down Expand Up @@ -435,26 +435,33 @@ static std::string to_string(const char* data, size_t size) {
* @return Number of bytes actually written.
*/
unsigned int process::write(void const* data, unsigned int size) {
std::lock_guard<std::mutex> lock(_lock_process);
ssize_t wb = ::write(_stream[in], data, size);
int fd;
pid_t my_process;
{
std::lock_guard<std::mutex> lock(_lock_process);
fd = _stream[in];
my_process = _process;
}

ssize_t wb = ::write(fd, data, size);
if (wb < 0) {
char const* msg(strerror(errno));
if (errno == EINTR)
throw interruption_error() << msg;
throw basic_error() << "could not write '"
<< to_string(static_cast<const char*>(data), size)
<< "' on process " << _process << "'s input: " << msg;
<< "' on process " << my_process << "'s input: " << msg;
}
return wb;
}

void process::do_close(int fd) {
std::unique_lock<std::mutex> lock(_lock_process);
if (_stream[process::out] == fd) {
_close(_stream[process::out]);
if (_stream[out] == fd) {
_close(_stream[out]);
_cv_buffer_out.notify_one();
} else if (_stream[process::err] == fd) {
_close(_stream[process::err]);
} else if (_stream[err] == fd) {
_close(_stream[err]);
_cv_buffer_err.notify_one();
}
if (!_is_running()) {
Expand Down Expand Up @@ -636,10 +643,10 @@ void process::_pipe(int fds[2]) {
}

ssize_t process::do_read(int fd) {
std::unique_lock<std::mutex> lock(_lock_process);
// Read content of the stream and push it.
char buffer[4096];
ssize_t size = ::read(fd, buffer, sizeof(buffer));

if (size == -1) {
char const* msg(strerror(errno));
if (errno == EINTR)
Expand All @@ -650,15 +657,16 @@ ssize_t process::do_read(int fd) {
if (size == 0)
return 0;

if (_stream[process::out] == fd) {
std::unique_lock<std::mutex> lock(_lock_process);
if (_stream[out] == fd) {
_buffer_out.append(buffer, size);
_cv_buffer_out.notify_one();
// Notify listener if necessary.
if (_listener) {
lock.unlock();
(_listener->data_is_available)(*this);
}
} else if (_stream[process::err] == fd) {
} else if (_stream[err] == fd) {
_buffer_err.append(buffer, size);
_cv_buffer_err.notify_one();
// Notify listener if necessary.
Expand Down
1 change: 1 addition & 0 deletions src/process_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static int const DEFAULT_TIMEOUT = 200;
process_manager::process_manager()
: _update(true), _running{false}, _thread{&process_manager::_run, this} {
std::unique_lock<std::mutex> lck(_running_m);
pthread_setname_np(_thread.native_handle(), "clib_prc_mgr");
_running_cv.wait(lck, [this]() -> bool { return _running; });
}

Expand Down
14 changes: 14 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ target_link_libraries("${TEST_NAME}" "${DEFAULT_LINK_NAME}")
add_test("${TEST_NAME}_stderr" "${TEST_NAME}" "err")
add_test("${TEST_NAME}_stdout" "${TEST_NAME}" "out")

# process_output_mt
set(TEST_NAME "process_output_mt")
add_executable("${TEST_NAME}" "${TEST_DIR}/${TEST_NAME}.cc")
target_link_libraries("${TEST_NAME}" "${DEFAULT_LINK_NAME}")
add_test("${TEST_NAME}_stderr" "${TEST_NAME}" "err")
add_test("${TEST_NAME}_stdout" "${TEST_NAME}" "out")

# process_output_mt1
set(TEST_NAME "process_output_mt1")
add_executable("${TEST_NAME}" "${TEST_DIR}/${TEST_NAME}.cc")
target_link_libraries("${TEST_NAME}" "${DEFAULT_LINK_NAME}")
add_test("${TEST_NAME}_stderr" "${TEST_NAME}" "err")
add_test("${TEST_NAME}_stdout" "${TEST_NAME}" "out")

# task_manager tests
set(TASK_MANAGER_LIST
task_manager_add
Expand Down
113 changes: 113 additions & 0 deletions test/process_output_mt.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
** Copyright 2021 Centreon
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
**
** For more information : [email protected]
*/

#include <cstdlib>
#include <cstring>
#include <future>
#include <iostream>
#include <string>
#include "com/centreon/clib.hh"
#include "com/centreon/exceptions/basic.hh"
#include "com/centreon/process.hh"

using namespace com::centreon;

/**
* Check class process (read stdout/stderr).
*
* @return EXIT_SUCCESS on success.
*/
int main(int argc, char** argv) {
int ret(EXIT_SUCCESS);
try {
if (argc != 2 || (strcmp(argv[1], "err") && strcmp(argv[1], "out"))) {
std::cerr << "usage: " << argv[0] << " err|out" << std::endl;
return EXIT_FAILURE;
}

std::string cmd("./bin_test_process_output check_output ");
cmd += argv[1];

static constexpr size_t size = 10 * 1024;
static constexpr size_t count = 30;

process p;
std::promise<bool> prm;
std::future<bool> running = prm.get_future();
std::thread th_exec([&p, &cmd, &prm] {
p.exec(cmd);
prm.set_value(true);
});

running.get();

std::thread th_write([&p] {
char buffer_write[size + 1];
buffer_write[size] = 0;
for (unsigned int i = 0; i < size; ++i)
buffer_write[i] = static_cast<char>('A' + i % 26);

for (int i = 0; i < count; i++) {
unsigned int total_write = 0;
do {
if (total_write < size)
total_write +=
p.write(buffer_write + total_write, size - total_write);
} while (total_write < size);
}
std::cout << "W ";
for (int i = 0; i < count; i++)
std::cout << buffer_write;
std::cout << std::endl;
});

std::string buffer_read;
do {
std::string data;
if (!strcmp(argv[1], "out"))
p.read(data);
else
p.read_err(data);
buffer_read += data;
} while (buffer_read.size() < size * count);

th_exec.join();
th_write.join();
p.update_ending_process(0);
p.wait();

std::cout << "R " << buffer_read << std::endl;
if (buffer_read.size() != size * count)
throw basic_error() << "read data with wrong length";

for (int i = 0, j = 0; i < size * count; i++) {
if (static_cast<char>('A' + j % 26) != buffer_read[i])
throw basic_error() << "bad character in the read data at position " << i;
if (++j == size)
j = 0;
}

if (p.exit_code() != EXIT_SUCCESS)
throw basic_error() << "invalid return";
}
catch (std::exception const& e) {
ret = EXIT_FAILURE;
std::cerr << "error: " << e.what() << std::endl;
}
return ret;
}
Loading

0 comments on commit 27d20ca

Please sign in to comment.