diff --git a/CMakeLists.txt b/CMakeLists.txt index 3c73267d..7a5276e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,23 @@ project(amqpcpp) include(set_cxx_norm.cmake) set_cxx_norm (${CXX_NORM_CXX11}) + +# get version from TAG on git +include(GetGitRevisionDescription.cmake) +git_describe(VERSION --tags) + +# parse the version information into pieces. +# http://ipenguin.ws/2012/11/cmake-automatically-use-git-tags-as.html +string(REGEX REPLACE "^v([0-9]+)\\..*" "\\1" VERSION_MAJOR "${VERSION}") +string(REGEX REPLACE "^v[0-9]+\\.([0-9]+).*" "\\1" VERSION_MINOR "${VERSION}") +string(REGEX REPLACE "^v[0-9]+\\.[0-9]+\\.([0-9]+).*" "\\1" VERSION_PATCH "${VERSION}") +string(REGEX REPLACE "^v[0-9]+\\.[0-9]+\\.[0-9]+(.*)" "\\1" VERSION_SHA1 "${VERSION}") +set(VERSION_SHORT "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_PATCH}") +set(VERSION_SONAME "${VERSION_MAJOR}.${VERSION_MINOR}") + +configure_file("${CMAKE_CURRENT_SOURCE_DIR}/Makefile.in" "${CMAKE_CURRENT_SOURCE_DIR}/Makefile") + + macro (add_sources) file (RELATIVE_PATH _relPath "${PROJECT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}") foreach (_src ${ARGN}) @@ -27,17 +44,32 @@ add_subdirectory(include) option(BUILD_SHARED "build shared library" OFF) if(BUILD_SHARED) - add_library(amqpcpp SHARED ${SRCS}) - set_target_properties(amqpcpp PROPERTIES SOVERSION 2.7) - install(TARGETS amqpcpp + add_library(${PROJECT_NAME} SHARED ${SRCS}) + set_target_properties(${PROJECT_NAME} PROPERTIES SOVERSION ${VERSION_SONAME}) + install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION lib ) else() - add_library(amqpcpp STATIC ${SRCS}) - install(TARGETS amqpcpp + add_library(${PROJECT_NAME} STATIC ${SRCS}) + install(TARGETS ${PROJECT_NAME} ARCHIVE DESTINATION lib ) endif() + +# if UNIX build package file +if(UNIX) + set(DEST_DIR "${CMAKE_INSTALL_PREFIX}") + + include(FindPkgConfig QUIET) + if(PKG_CONFIG_FOUND) + # Produce a pkg-config file for linking against the shared lib + configure_file("${CMAKE_CURRENT_SOURCE_DIR}/amqpcpp.pc.in" "${CMAKE_CURRENT_SOURCE_DIR}/amqpcpp.pc" @ONLY) + install(FILES "${CMAKE_CURRENT_SOURCE_DIR}/amqpcpp.pc" + DESTINATION "${CMAKE_INSTALL_PREFIX}/lib/pkgconfig") + endif() +endif() + + Include_directories(${PROJECT_SOURCE_DIR}) install(DIRECTORY include/ DESTINATION include/amqpcpp FILES_MATCHING PATTERN "*.h") diff --git a/GetGitRevisionDescription.cmake b/GetGitRevisionDescription.cmake new file mode 100644 index 00000000..8ab03bc5 --- /dev/null +++ b/GetGitRevisionDescription.cmake @@ -0,0 +1,168 @@ +# - Returns a version string from Git +# +# These functions force a re-configure on each git commit so that you can +# trust the values of the variables in your build system. +# +# get_git_head_revision( [ ...]) +# +# Returns the refspec and sha hash of the current head revision +# +# git_describe( [ ...]) +# +# Returns the results of git describe on the source tree, and adjusting +# the output so that it tests false if an error occurs. +# +# git_get_exact_tag( [ ...]) +# +# Returns the results of git describe --exact-match on the source tree, +# and adjusting the output so that it tests false if there was no exact +# matching tag. +# +# git_local_changes() +# +# Returns either "CLEAN" or "DIRTY" with respect to uncommitted changes. +# Uses the return code of "git diff-index --quiet HEAD --". +# Does not regard untracked files. +# +# Requires CMake 2.6 or newer (uses the 'function' command) +# +# Original Author: +# 2009-2010 Ryan Pavlik +# http://academic.cleardefinition.com +# Iowa State University HCI Graduate Program/VRAC +# +# Copyright Iowa State University 2009-2010. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +if(__get_git_revision_description) + return() +endif() +set(__get_git_revision_description YES) + +# We must run the following at "include" time, not at function call time, +# to find the path to this module rather than the path to a calling list file +get_filename_component(_gitdescmoddir ${CMAKE_CURRENT_LIST_FILE} PATH) + +function(get_git_head_revision _refspecvar _hashvar) + set(GIT_PARENT_DIR "${CMAKE_CURRENT_SOURCE_DIR}") + set(GIT_DIR "${GIT_PARENT_DIR}/.git") + while(NOT EXISTS "${GIT_DIR}") # .git dir not found, search parent directories + set(GIT_PREVIOUS_PARENT "${GIT_PARENT_DIR}") + get_filename_component(GIT_PARENT_DIR ${GIT_PARENT_DIR} PATH) + if(GIT_PARENT_DIR STREQUAL GIT_PREVIOUS_PARENT) + # We have reached the root directory, we are not in git + set(${_refspecvar} "GITDIR-NOTFOUND" PARENT_SCOPE) + set(${_hashvar} "GITDIR-NOTFOUND" PARENT_SCOPE) + return() + endif() + set(GIT_DIR "${GIT_PARENT_DIR}/.git") + endwhile() + # check if this is a submodule + if(NOT IS_DIRECTORY ${GIT_DIR}) + file(READ ${GIT_DIR} submodule) + string(REGEX REPLACE "gitdir: (.*)\n$" "\\1" GIT_DIR_RELATIVE ${submodule}) + get_filename_component(SUBMODULE_DIR ${GIT_DIR} PATH) + get_filename_component(GIT_DIR ${SUBMODULE_DIR}/${GIT_DIR_RELATIVE} ABSOLUTE) + endif() + set(GIT_DATA "${CMAKE_CURRENT_BINARY_DIR}/CMakeFiles/git-data") + if(NOT EXISTS "${GIT_DATA}") + file(MAKE_DIRECTORY "${GIT_DATA}") + endif() + + if(NOT EXISTS "${GIT_DIR}/HEAD") + return() + endif() + set(HEAD_FILE "${GIT_DATA}/HEAD") + configure_file("${GIT_DIR}/HEAD" "${HEAD_FILE}" COPYONLY) + + configure_file("${_gitdescmoddir}/GetGitRevisionDescription.cmake.in" + "${GIT_DATA}/grabRef.cmake" + @ONLY) + include("${GIT_DATA}/grabRef.cmake") + + set(${_refspecvar} "${HEAD_REF}" PARENT_SCOPE) + set(${_hashvar} "${HEAD_HASH}" PARENT_SCOPE) +endfunction() + +function(git_describe _var) + if(NOT GIT_FOUND) + find_package(Git QUIET) + endif() + get_git_head_revision(refspec hash) + if(NOT GIT_FOUND) + set(${_var} "GIT-NOTFOUND" PARENT_SCOPE) + return() + endif() + if(NOT hash) + set(${_var} "HEAD-HASH-NOTFOUND" PARENT_SCOPE) + return() + endif() + + # TODO sanitize + #if((${ARGN}" MATCHES "&&") OR + # (ARGN MATCHES "||") OR + # (ARGN MATCHES "\\;")) + # message("Please report the following error to the project!") + # message(FATAL_ERROR "Looks like someone's doing something nefarious with git_describe! Passed arguments ${ARGN}") + #endif() + + #message(STATUS "Arguments to execute_process: ${ARGN}") + + execute_process(COMMAND + "${GIT_EXECUTABLE}" + describe + ${hash} + ${ARGN} + WORKING_DIRECTORY + "${CMAKE_CURRENT_SOURCE_DIR}" + RESULT_VARIABLE + res + OUTPUT_VARIABLE + out + ERROR_QUIET + OUTPUT_STRIP_TRAILING_WHITESPACE) + if(NOT res EQUAL 0) + set(out "${out}-${res}-NOTFOUND") + endif() + + set(${_var} "${out}" PARENT_SCOPE) +endfunction() + +function(git_get_exact_tag _var) + git_describe(out --exact-match ${ARGN}) + set(${_var} "${out}" PARENT_SCOPE) +endfunction() + +function(git_local_changes _var) + if(NOT GIT_FOUND) + find_package(Git QUIET) + endif() + get_git_head_revision(refspec hash) + if(NOT GIT_FOUND) + set(${_var} "GIT-NOTFOUND" PARENT_SCOPE) + return() + endif() + if(NOT hash) + set(${_var} "HEAD-HASH-NOTFOUND" PARENT_SCOPE) + return() + endif() + + execute_process(COMMAND + "${GIT_EXECUTABLE}" + diff-index --quiet HEAD -- + WORKING_DIRECTORY + "${CMAKE_CURRENT_SOURCE_DIR}" + RESULT_VARIABLE + res + OUTPUT_VARIABLE + out + ERROR_QUIET + OUTPUT_STRIP_TRAILING_WHITESPACE) + if(res EQUAL 0) + set(${_var} "CLEAN" PARENT_SCOPE) + else() + set(${_var} "DIRTY" PARENT_SCOPE) + endif() +endfunction() diff --git a/GetGitRevisionDescription.cmake.in b/GetGitRevisionDescription.cmake.in new file mode 100644 index 00000000..6d8b708e --- /dev/null +++ b/GetGitRevisionDescription.cmake.in @@ -0,0 +1,41 @@ +# +# Internal file for GetGitRevisionDescription.cmake +# +# Requires CMake 2.6 or newer (uses the 'function' command) +# +# Original Author: +# 2009-2010 Ryan Pavlik +# http://academic.cleardefinition.com +# Iowa State University HCI Graduate Program/VRAC +# +# Copyright Iowa State University 2009-2010. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +set(HEAD_HASH) + +file(READ "@HEAD_FILE@" HEAD_CONTENTS LIMIT 1024) + +string(STRIP "${HEAD_CONTENTS}" HEAD_CONTENTS) +if(HEAD_CONTENTS MATCHES "ref") + # named branch + string(REPLACE "ref: " "" HEAD_REF "${HEAD_CONTENTS}") + if(EXISTS "@GIT_DIR@/${HEAD_REF}") + configure_file("@GIT_DIR@/${HEAD_REF}" "@GIT_DATA@/head-ref" COPYONLY) + else() + configure_file("@GIT_DIR@/packed-refs" "@GIT_DATA@/packed-refs" COPYONLY) + file(READ "@GIT_DATA@/packed-refs" PACKED_REFS) + if(${PACKED_REFS} MATCHES "([0-9a-z]*) ${HEAD_REF}") + set(HEAD_HASH "${CMAKE_MATCH_1}") + endif() + endif() +else() + # detached HEAD + configure_file("@GIT_DIR@/HEAD" "@GIT_DATA@/head-ref" COPYONLY) +endif() + +if(NOT HEAD_HASH) + file(READ "@GIT_DATA@/head-ref" HEAD_HASH LIMIT 1024) + string(STRIP "${HEAD_HASH}" HEAD_HASH) +endif() diff --git a/Makefile b/Makefile.in similarity index 87% rename from Makefile rename to Makefile.in index 9f1fe759..c11a7181 100644 --- a/Makefile +++ b/Makefile.in @@ -1,9 +1,9 @@ PREFIX ?= /usr INCLUDE_DIR = ${PREFIX}/include LIBRARY_DIR = ${PREFIX}/lib -export LIBRARY_NAME = amqpcpp -export SONAME = 2.8 -export VERSION = 2.8.0 +export LIBRARY_NAME = ${PROJECT_NAME} +export SONAME = ${VERSION_SONAME} +export VERSION = ${VERSION_SHORT} all: $(MAKE) -C src all diff --git a/amqpcpp.pc.in b/amqpcpp.pc.in new file mode 100644 index 00000000..61d38a59 --- /dev/null +++ b/amqpcpp.pc.in @@ -0,0 +1,10 @@ +prefix=@DEST_DIR@ +libdir=@DEST_DIR@/lib +includedir=@DEST_DIR@/include + +Name: @PROJECT_NAME@ +Description: AMQP-CPP is a C++ library for communicating with a RabbitMQ message broker +Version: @VERSION_SHORT@ + +Libs: -l@PROJECT_NAME@ +Cflags: -I${includedir} diff --git a/examples/boost/1_hello_world/Makefile b/examples/boost/1_hello_world/Makefile new file mode 100644 index 00000000..b3b28023 --- /dev/null +++ b/examples/boost/1_hello_world/Makefile @@ -0,0 +1,23 @@ +CPP = g++ +CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g +LD = g++ +LDFLAGS = -lpthread -lboost_system -lamqpcpp +SOURCES = $(wildcard *.cpp) +OBJECTS = producer.o consumer.o + +all: producer \ + consumer + + +producer: ${OBJECTS} + ${LD} -o $@ producer.o ${LDFLAGS} + +consumer: ${OBJECTS} + ${LD} -o $@ consumer.o ${LDFLAGS} + + +clean: + ${RM} *.obj *~* ${OBJECTS} ${RESULT} + +${OBJECTS}: + ${CPP} ${CPPFLAGS} -o $@ ${@:%.o=%.cpp} diff --git a/examples/boost/1_hello_world/consumer.cpp b/examples/boost/1_hello_world/consumer.cpp new file mode 100644 index 00000000..01e056cf --- /dev/null +++ b/examples/boost/1_hello_world/consumer.cpp @@ -0,0 +1,69 @@ +/** + * consumer.cpp + * + * @author Alessandro Pischedda + * + * Compile with g++ -std=c++11 consumer.cpp -o consumer -lpthread -lboost_system -lamqpcpp + */ + +#include + +#include +#include +#include + +#include +#include + + +// callback operation when a message was received +auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { + + std::string message_str(message.body(), message.bodySize()); + std::cout << "[x] Received '" << message_str << "'" << std::endl; +}; + + +int main(int argc, char* argv[]) +{ + struct Data { + std::string queue_name; + std::string message; + std::string routing_key; + std::string exchange_name; + }; + + Data data; + + data.queue_name = "hello"; + data.message = "Hello World!"; + data.routing_key = "hello"; + data.exchange_name = ""; + + boost::asio::io_service service; + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libboost + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + + AMQP::TcpChannel channel(&connection); + + // create the queue + channel.declareQueue(data.queue_name).onSuccess( + [&channel, &data](const std::string &name, uint32_t messagecount, uint32_t consumercount) + { + channel.consume(data.queue_name).onReceived(messageCb); + }); + + std::cout << "[*] Waiting for messages. To exit press CTRL+C" << std::endl; + service.run(); + + return 0; +} + + diff --git a/examples/boost/1_hello_world/producer.cpp b/examples/boost/1_hello_world/producer.cpp new file mode 100644 index 00000000..14e11668 --- /dev/null +++ b/examples/boost/1_hello_world/producer.cpp @@ -0,0 +1,65 @@ +/** + * producer.cpp + * + * @author Alessandro Pischedda + * + * Compile with g++ -std=c++11 producer.cpp -o producer -lpthread -lboost_system -lamqpcpp + */ + +#include + +#include +#include +#include + +#include +#include + +int main(int argc, char* argv[]) +{ + + struct Data { + std::string queue_name; + std::string message; + std::string routing_key; + std::string exchange_name; + }; + + Data data; + data.queue_name = "hello"; + data.message = "Hello World!"; + data.routing_key = "hello"; + data.exchange_name = ""; + + boost::asio::io_service service; + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libboost + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + + AMQP::TcpChannel channel(&connection); + + // create the queue + channel.declareQueue(data.queue_name).onSuccess( + [&connection, &service, &channel, &data](const std::string &name, uint32_t messagecount, uint32_t consumercount) + { + + channel.publish(data.exchange_name, data.routing_key, data.message); + std::cout << "[x] Sent " << data.message << std::endl; + + // close connection + connection.close(); + // stop io_service + service.stop(); + }); + std::cout << "Start producer...\n"; + service.run(); + + return 0; +} + diff --git a/examples/boost/2_work_queues/Makefile b/examples/boost/2_work_queues/Makefile new file mode 100644 index 00000000..7b78996a --- /dev/null +++ b/examples/boost/2_work_queues/Makefile @@ -0,0 +1,23 @@ +CPP = g++ +CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g +LD = g++ +LDFLAGS = -lpthread -lboost_system -lamqpcpp +SOURCES = $(wildcard *.cpOBJECTS +OBJECTS = new_task.o worker.o +all: \ + new_task \ + worker + + +new_task: ${OBJECTS} + ${LD} -o $@ new_task.o ${LDFLAGS} + +worker: ${OBJECTS} + ${LD} -o $@ worker.o ${LDFLAGS} + + +clean: + ${RM} *.obj *~* ${OBJECTS} ${RESULT} + +${OBJECTS}: + ${CPP} ${CPPFLAGS} -o $@ ${@:%.o=%.cpp} diff --git a/examples/boost/2_work_queues/new_task.cpp b/examples/boost/2_work_queues/new_task.cpp new file mode 100644 index 00000000..0405261e --- /dev/null +++ b/examples/boost/2_work_queues/new_task.cpp @@ -0,0 +1,68 @@ +/** + * producer.cpp + * + * @author Alessandro Pischedda + * + * Compile with g++ -std=c++11 new_task.cpp -o new_task -lpthread -lboost_system -lamqpcpp + */ + +#include + +#include +#include +#include + +#include +#include + +int main(int argc, char* argv[]) +{ + + struct Data { + std::string queue_name; + std::string message; + std::string routing_key; + std::string exchange_name; + }; + + Data data; + data.queue_name = "task_queue"; + data.message = "Hello World!"; + data.routing_key = "hello"; + data.exchange_name = ""; + + if (argc == 2) + data.message = argv[1]; + + boost::asio::io_service service; + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libboost + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + + AMQP::TcpChannel channel(&connection); + + // create the queue + channel.declareQueue(data.queue_name, AMQP::durable).onSuccess( + [&connection, &service, &channel, &data](const std::string &name, uint32_t messagecount, uint32_t consumercount) + { + + channel.publish(data.exchange_name, data.routing_key, data.message); + std::cout << "[x] Sent " << data.message << std::endl; + + // close connection + connection.close(); + // stop io_service + service.stop(); + }); + std::cout << "Start producer...\n"; + service.run(); + + return 0; +} + diff --git a/examples/boost/2_work_queues/worker.cpp b/examples/boost/2_work_queues/worker.cpp new file mode 100644 index 00000000..fa0b7285 --- /dev/null +++ b/examples/boost/2_work_queues/worker.cpp @@ -0,0 +1,71 @@ +/** + * consumer.cpp + * + * @author Alessandro Pischedda + * + * Compile with g++ -std=c++11 consumer.cpp -o consumer -lpthread -lboost_system -lamqpcpp + */ + +#include + +#include +#include +#include + +#include +#include + + + +int main(int argc, char* argv[]) +{ + struct Data { + std::string queue_name; + std::string message; + std::string routing_key; + std::string exchange_name; + }; + + Data data; + + data.queue_name = "hello"; + data.message = "Hello World!"; + data.routing_key = "hello"; + data.exchange_name = ""; + + boost::asio::io_service service; + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libboost + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + + AMQP::TcpChannel channel(&connection); + + // create the queue + channel.declareQueue(data.queue_name).onSuccess( + [&channel, &data](const std::string &name, uint32_t messagecount, uint32_t consumercount) + { + channel.setQos(1); + // callback operation when a message was received + auto messageCb = [&channel](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { + + std::string message_str(message.body(), message.bodySize()); + std::cout << "[x] Received '" << message_str << "'" << std::endl; + channel.ack(); + }; + + channel.consume(data.queue_name).onReceived(messageCb); + }); + + std::cout << "[*] Waiting for messages. To exit press CTRL+C" << std::endl; + service.run(); + + return 0; +} + + diff --git a/examples/boost/Makefile b/examples/boost/Makefile new file mode 100644 index 00000000..b3b28023 --- /dev/null +++ b/examples/boost/Makefile @@ -0,0 +1,23 @@ +CPP = g++ +CPPFLAGS = -Wall -c -I. -O2 -flto -std=c++11 -g +LD = g++ +LDFLAGS = -lpthread -lboost_system -lamqpcpp +SOURCES = $(wildcard *.cpp) +OBJECTS = producer.o consumer.o + +all: producer \ + consumer + + +producer: ${OBJECTS} + ${LD} -o $@ producer.o ${LDFLAGS} + +consumer: ${OBJECTS} + ${LD} -o $@ consumer.o ${LDFLAGS} + + +clean: + ${RM} *.obj *~* ${OBJECTS} ${RESULT} + +${OBJECTS}: + ${CPP} ${CPPFLAGS} -o $@ ${@:%.o=%.cpp} diff --git a/examples/boost/consumer.cpp b/examples/boost/consumer.cpp new file mode 100644 index 00000000..bb4c9743 --- /dev/null +++ b/examples/boost/consumer.cpp @@ -0,0 +1,114 @@ +/** + * LibBoostAsio.cpp + * + * Test program to check AMQP functionality based on Boost's asio io_service. + * + * @author Gavin Smith + * + * Compile with g++ -std=c++14 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp + */ + +/** + * Dependencies + */ + +#include // std::shared_ptr + +#include +#include +#include + + +#include +#include + + +// callback function that is called when the consume operation starts +auto startCb = [](const std::string &consumertag) { + + std::cout << "consume operation started" << std::endl; +}; + +// callback function that is called when the consume operation failed +auto errorCb = [](const char *message) { + + std::cout << "consume operation failed" << std::endl; +}; + + +void printRabbitMessage(const AMQP::Message &message) +{ + std::string message_str(message.body(), message.bodySize()); + std::cout << "[" << message.exchange() << "] : " << message_str << std::endl; + if (message.hasCorrelationID()) + std::cout << "Correlation ID: " << message.correlationID() << std::endl; + if (message.hasReplyTo()) + std::cout << "Replay to : " << message.replyTo() << std::endl; + + +} + +// callback operation when a message was received +auto messageCb = [](const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { + + printRabbitMessage(message); +}; + + +/** + * Main program + * @return int + */ +int dynamic_main() +{ + + // access to the boost asio handler + // note: we suggest use of 2 threads - normally one is fin (we are simply demonstrating thread safety). + boost::asio::io_service service(4); + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libev + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + // + std::shared_ptr connection; + connection = std::make_shared(&handler, AMQP::Address("amqp://guest:guest@localhost/")); + std::cout << "Create connection to amqp://guest:guest@localhost/\n"; + + // we need a channel too + AMQP::TcpChannel channel(connection.get()); + std::cout << "Create channel\n"; + + channel.declareQueue("my_event_queue").onSuccess([&channel](const std::string& name, uint32_t messageCount, uint32_t consumerCount) { + + std::cout << "Queue '" << name << "' has been declared with " << messageCount << " messages and " << consumerCount << " consumers" << std::endl; + channel.bindQueue("EVENT_EXCHANGE", "my_event_queue", "event.am.*").onError([&name](const char* message){ + std::cout << "Binding " << name << " FAILED\n"; + std::cout << "Message: " << message << std::endl; + }).onSuccess([&channel](){ + + channel.consume("my_event_queue") + .onReceived(messageCb) + .onSuccess(startCb) + .onError(errorCb); + }); + + }); + + + // run the handler + // a t the moment, one will need SIGINT to stop. In time, should add signal handling through boost API. + std::cout << "Start example...\n"; + return service.run(); +} + + +int main() +{ + return dynamic_main(); + // return static_main(); + +} diff --git a/examples/boost/producer.cpp b/examples/boost/producer.cpp new file mode 100644 index 00000000..33f10868 --- /dev/null +++ b/examples/boost/producer.cpp @@ -0,0 +1,118 @@ +/** + * LibBoostAsio.cpp + * + * Test program to check AMQP functionality based on Boost's asio io_service. + * + * @author Alessandro Pischedda + * + * Compile with g++ -std=c++11 libboostasio.cpp -o boost_test -lpthread -lboost_system -lamqpcpp + */ + +#include +#include // std::shared_ptr + +#include +#include +#include +#include + +#include +#include + + +struct Options { + std::string exchange_name; + std::string exchange_type; + std::string queue_name; + std::string rabbit_server; +}; + + +void usage() +{ + std::cout << "Producer Help" ; +} + +bool handle_options(struct Options &opt, int argc, char* argv[]) +{ + char c; + + while((c=getopt(argc, argv, "n:t:q:s:h")) != -1) { + + switch(c) { + case 'n': + opt->exchange_name = std::string(optarg); + break; + + case 't': + opt->exchange_type = std::string(optarg); + break; + + case 's': + opt->rabbit_server = std::string(optarg); + break; + + case 'q': + opt->queue_name = std::string(optarg); + break; + + case 'h': usage(); + return false; + } // END SWITCH + } // END WHILE + + return true; +} + +void run(Options opt) +{ + + boost::asio::io_service service(); + + // create a work object to process our events. + boost::asio::io_service::work work(service); + + // handler for libboost + AMQP::LibBoostAsioHandler handler(service); + + // make a connection + AMQP::TcpConnection connection(&handler, AMQP::Address(opt.rabbit_server)); + + AMQP::TcpChannel channel(&connection); + + // create the queue + channel.declareQueue(queue_name).onSuccess([&connection](const std::string &name, uint32_t messagecount, uint32_t consumercount) { + + // report the name of the temporary queue + std::cout << "declared queue " << name << std::endl; + + channel.declareExchange(exchange_name).onError([](const char* message) + { + std::cout << "Error: " << message << std::endl; + }).onSuccess([&channel, &connection]() + { + std::string producer_message; + for(int i = 0; ; i++) + { + producer_message = "Message " + std::to_string(i); + channel.publish(exchange_name, "", producer_message); + } + }); + + }); + + // run the handler + // a t the moment, one will need SIGINT to stop. In time, should add signal handling through boost API. + std::cout << "Start producer...\n"; + service.run(); +} + + + +int main(int argc, char* argv[]) +{ + Options opt; + if (!handle_options(opt, argc, argv) + return -1; + run(opt); +} diff --git a/include/libboostasio.h b/include/libboostasio.h index 3f80b89c..f0d4ed4e 100644 --- a/include/libboostasio.h +++ b/include/libboostasio.h @@ -26,12 +26,35 @@ #include #include -// C++17 has 'weak_from_this()' support. -#if __cplusplus >= 201701L -#define PTR_FROM_THIS weak_from_this -#else -#define PTR_FROM_THIS shared_from_this -#endif + +/////////////////////////////////////////////////////////////////// +#define STRAND_SOCKET_HANDLER(_fn) \ +[fn = _fn, strand = _strand](const boost::system::error_code &ec, \ + const std::size_t bytes_transferred) \ +{ \ + const std::shared_ptr apStrand = strand.lock(); \ + if (!apStrand) \ + { \ + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled),std::size_t{0}); \ + return; \ + } \ + \ + apStrand->dispatch(boost::bind(fn,ec,bytes_transferred)); \ +} + +/////////////////////////////////////////////////////////////////// +#define STRAND_TIMER_HANDLER(_fn) \ +[fn = _fn, strand = _strand](const boost::system::error_code &ec) \ +{ \ + const std::shared_ptr apStrand = strand.lock(); \ + if (!apStrand) \ + { \ + fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); \ + return; \ + } \ + \ + apStrand->dispatch(boost::bind(fn,ec)); \ +} /** * Set up namespace @@ -97,64 +120,6 @@ class LibBoostAsioHandler : public virtual TcpHandler */ bool _write_pending{false}; - using handler_cb = boost::function; - using io_handler = boost::function; - - /** - * Builds a io handler callback that executes the io callback in a strand. - * @param io_handler The handler callback to dispatch - * @return handler_cb A function wrapping the execution of the handler function in a io_service::strand. - */ - handler_cb get_dispatch_wrapper(io_handler fn) - { - return [fn, strand = _strand](const boost::system::error_code &ec, const std::size_t bytes_transferred) - { - const std::shared_ptr apStrand = strand.lock(); - if (!apStrand) - { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled), std::size_t{0}); - return; - } - apStrand->dispatch(boost::bind(fn, ec, bytes_transferred)); - }; - } - - /** - * Binds and returns a read handler for the io operation. - * @param connection The connection being watched. - * @param fd The file descripter being watched. - * @return handler callback - */ - handler_cb get_read_handler(TcpConnection *const connection, const int fd) - { - auto fn = boost::bind(&Watcher::read_handler, - this, - _1, - _2, - PTR_FROM_THIS(), - connection, - fd); - return get_dispatch_wrapper(fn); - } - - /** - * Binds and returns a read handler for the io operation. - * @param connection The connection being watched. - * @param fd The file descripter being watched. - * @return handler callback - */ - handler_cb get_write_handler(TcpConnection *const connection, const int fd) - { - auto fn = boost::bind(&Watcher::write_handler, - this, - _1, - _2, - PTR_FROM_THIS(), - connection, - fd); - return get_dispatch_wrapper(fn); - } - /** * Handler method that is called by boost's io_service when the socket pumps a read event. * @param ec The status of the callback. @@ -182,10 +147,21 @@ class LibBoostAsioHandler : public virtual TcpHandler connection->process(fd, AMQP::readable); _read_pending = true; - - _socket.async_read_some( - boost::asio::null_buffers(), - get_read_handler(connection, fd)); + + _socket.async_read_some(boost::asio::null_buffers(), + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::read_handler, + this, + boost::arg<1>(), + boost::arg<2>(), +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this(), +#else + shared_from_this(), +#endif + connection, + fd))); } } @@ -217,9 +193,20 @@ class LibBoostAsioHandler : public virtual TcpHandler _write_pending = true; - _socket.async_write_some( - boost::asio::null_buffers(), - get_write_handler(connection, fd)); + _socket.async_write_some(boost::asio::null_buffers(), + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::write_handler, + this, + boost::arg<1>(), + boost::arg<2>(), +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this(), +#else + shared_from_this(), +#endif + connection, + fd))); } } @@ -275,9 +262,20 @@ class LibBoostAsioHandler : public virtual TcpHandler { _read_pending = true; - _socket.async_read_some( - boost::asio::null_buffers(), - get_read_handler(connection, fd)); + _socket.async_read_some(boost::asio::null_buffers(), + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::read_handler, + this, + boost::arg<1>(), + boost::arg<2>(), +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this(), +#else + shared_from_this(), +#endif + connection, + fd))); } // 2. Handle writes? @@ -288,9 +286,20 @@ class LibBoostAsioHandler : public virtual TcpHandler { _write_pending = true; - _socket.async_write_some( - boost::asio::null_buffers(), - get_write_handler(connection, fd)); + _socket.async_write_some(boost::asio::null_buffers(), + STRAND_SOCKET_HANDLER( + boost::bind(&Watcher::write_handler, + this, + boost::arg<1>(), + boost::arg<2>(), +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this(), +#else + shared_from_this(), +#endif + connection, + fd))); } } }; @@ -320,39 +329,11 @@ class LibBoostAsioHandler : public virtual TcpHandler */ boost::asio::deadline_timer _timer; - using handler_fn = boost::function; - /** - * Binds and returns a lamba function handler for the io operation. - * @param connection The connection being watched. - * @param timeout The file descripter being watched. - * @return handler callback - */ - handler_fn get_handler(TcpConnection *const connection, const uint16_t timeout) - { - auto fn = boost::bind(&Timer::timeout, - this, - _1, - PTR_FROM_THIS(), - connection, - timeout); - return [fn, strand = _strand](const boost::system::error_code &ec) - { - const std::shared_ptr apStrand = strand.lock(); - if (!apStrand) - { - fn(boost::system::errc::make_error_code(boost::system::errc::operation_canceled)); - return; - } - apStrand->dispatch(boost::bind(fn, ec)); - }; - } - /** * Callback method that is called by libev when the timer expires - * @param ec error code returned from loop * @param loop The loop in which the event was triggered - * @param connection - * @param timeout + * @param timer Internal timer object + * @param revents The events that triggered this call */ void timeout(const boost::system::error_code &ec, std::weak_ptr awpThis, @@ -376,7 +357,18 @@ class LibBoostAsioHandler : public virtual TcpHandler _timer.expires_at(_timer.expires_at() + boost::posix_time::seconds(timeout)); // Posts the timer event - _timer.async_wait(get_handler(connection, timeout)); + _timer.async_wait(STRAND_TIMER_HANDLER( + boost::bind(&Timer::timeout, + this, + boost::arg<1>(), +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this(), +#else + shared_from_this(), +#endif + connection, + timeout))); } } @@ -431,11 +423,19 @@ class LibBoostAsioHandler : public virtual TcpHandler // stop timer in case it was already set stop(); - // Reschedule the timer for the future: _timer.expires_from_now(boost::posix_time::seconds(timeout)); - - // Posts the timer event - _timer.async_wait(get_handler(connection, timeout)); + _timer.async_wait(STRAND_TIMER_HANDLER( + boost::bind(&Timer::timeout, + this, + boost::arg<1>(), +// C++17 has 'weak_from_this()' support. +#if __cplusplus >= 201701L + weak_from_this(), +#else + shared_from_this(), +#endif + connection, + timeout))); } };