Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat fixed #217

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ if (CMAKE_CXX_FLAGS STREQUAL ""
AND NOT DEFINED SAC_CXX_FLAGS_SET)
if (CMAKE_COMPILER_IS_GNUCXX
OR CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
SET(CMAKE_CXX_FLAGS "-Wall -Wextra" CACHE STRING "Flags used by the compiler during all build types." FORCE)
SET(CMAKE_CXX_FLAGS "-std=c++11 -Wall -Wextra" CACHE STRING "Flags used by the compiler during all build types." FORCE)
endif ()
set(SAC_CXX_FLAGS_SET TRUE CACHE INTERNAL "Have the SAC default compiler flags been set?")
endif ()
Expand Down
28 changes: 26 additions & 2 deletions src/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ Channel::Channel(const std::string &host,
const std::string &password,
const std::string &vhost,
int frame_max) :
Channel(host, port, username, password, vhost, frame_max, 0)
{
}

Channel::Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
int heartbeat) :
m_impl(new Detail::ChannelImpl)
{
m_impl->m_connection = amqp_new_connection();
Expand All @@ -112,7 +123,7 @@ Channel::Channel(const std::string &host,
int sock = amqp_socket_open(socket, host.c_str(), port);
m_impl->CheckForError(sock);

m_impl->DoLogin(username, password, vhost, frame_max);
m_impl->DoLogin(username, password, vhost, frame_max, heartbeat);
}
catch (...)
{
Expand All @@ -123,13 +134,25 @@ Channel::Channel(const std::string &host,
m_impl->SetIsConnected(true);
}

Channel::Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
const SSLConnectionParams &ssl_params)
: Channel(host, port, username, password, vhost, frame_max, 0, ssl_params)
{
}

#ifdef SAC_SSL_SUPPORT_ENABLED
Channel::Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
int heartbeat,
const SSLConnectionParams &ssl_params)
: m_impl(new Detail::ChannelImpl)
{
Expand Down Expand Up @@ -174,7 +197,7 @@ Channel::Channel(const std::string &host,
status, "Error setting client certificate for socket");
}

m_impl->DoLogin(username, password, vhost, frame_max);
m_impl->DoLogin(username, password, vhost, frame_max, heartbeat);
}
catch (...)
{
Expand All @@ -191,6 +214,7 @@ Channel::Channel(const std::string &,
const std::string &,
const std::string &,
int ,
int ,
const SSLConnectionParams &)
{
throw std::logic_error("SSL support has not been compiled into SimpleAmqpClient");
Expand Down
9 changes: 4 additions & 5 deletions src/ChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

#include <string.h>

#define BROKER_HEARTBEAT 0

namespace AmqpClient
{
Expand All @@ -68,7 +67,7 @@ ChannelImpl::~ChannelImpl()
}

void ChannelImpl::DoLogin(const std::string &username,
const std::string &password, const std::string &vhost, int frame_max)
const std::string &password, const std::string &vhost, int frame_max, int heartbeat)
{
amqp_table_entry_t capabilties[1];
amqp_table_entry_t capability_entry;
Expand All @@ -88,7 +87,7 @@ void ChannelImpl::DoLogin(const std::string &username,
client_properties.entries = &capability_entry;

CheckRpcReply(0, amqp_login_with_properties(m_connection, vhost.c_str(), 0,
frame_max, BROKER_HEARTBEAT, &client_properties,
frame_max, heartbeat, &client_properties,
AMQP_SASL_METHOD_PLAIN, username.c_str(), password.c_str()));
}

Expand Down Expand Up @@ -372,15 +371,15 @@ bool ChannelImpl::CheckForQueuedMessageOnChannel(amqp_channel_t channel) const
return true;
}

void ChannelImpl::AddToFrameQueue(const amqp_frame_t &frame)
void ChannelImpl::AddToFrameQueue(const amqp_frame_t &frame, boost::chrono::microseconds timeout)
{
m_frame_queue.push_back(frame);

if (CheckForQueuedMessageOnChannel(frame.channel))
{
boost::array<amqp_channel_t, 1> channel = {{frame.channel}};
Envelope::ptr_t envelope;
if (!ConsumeMessageOnChannelInner(channel, envelope, -1))
if (!ConsumeMessageOnChannelInner(channel, envelope, boost::chrono::duration_cast<boost::chrono::milliseconds>(timeout).count()))
{
throw std::logic_error("ConsumeMessageOnChannelInner returned false unexpectedly");
}
Expand Down
26 changes: 23 additions & 3 deletions src/SimpleAmqpClient/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable
static const std::string EXCHANGE_TYPE_FANOUT;
static const std::string EXCHANGE_TYPE_TOPIC;

/**
/**
* Creates a new channel object
* Creates a new connection to an AMQP broker using the supplied parameters and opens
* a single channel for use
Expand All @@ -82,16 +82,18 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable
* @param channel_max Request that the server limit the number of channels for
* this connection to the specified parameter, a value of zero will use the broker-supplied value
* @param frame_max Request that the server limit the maximum size of any frame to this value
* @param heartbeat The number of seconds between heartbeats or 0 to disable heartbeats
* @return a new Channel object pointer
*/
static ptr_t Create(const std::string &host = "127.0.0.1",
int port = 5672,
const std::string &username = "guest",
const std::string &password = "guest",
const std::string &vhost = "/",
int frame_max = 131072)
int frame_max = 131072,
int heartbeat = 0)
{
return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max);
return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, heartbeat);
}

protected:
Expand Down Expand Up @@ -173,6 +175,24 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable
const std::string &password,
const std::string &vhost,
int frame_max,
int heartbeat);


explicit Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
const SSLConnectionParams &ssl_params);

explicit Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
int heartbeat,
const SSLConnectionParams &ssl_params);


Expand Down
6 changes: 3 additions & 3 deletions src/SimpleAmqpClient/ChannelImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ class ChannelImpl : boost::noncopyable
typedef channel_map_t::iterator channel_map_iterator_t;

void DoLogin(const std::string &username, const std::string &password,
const std::string &vhost, int frame_max);
const std::string &vhost, int frame_max, int heartbeat);
amqp_channel_t GetChannel();
void ReturnChannel(amqp_channel_t channel);
bool IsChannelOpen(amqp_channel_t channel);

bool GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::microseconds timeout);

bool CheckForQueuedMessageOnChannel(amqp_channel_t message_on_channel) const;
void AddToFrameQueue(const amqp_frame_t &frame);
void AddToFrameQueue(const amqp_frame_t &frame, boost::chrono::microseconds timeout);

template <class ChannelListType>
bool GetNextFrameFromBrokerOnChannel(const ChannelListType channels, amqp_frame_t &frame_out,
Expand Down Expand Up @@ -106,7 +106,7 @@ class ChannelImpl : boost::noncopyable
}
else
{
AddToFrameQueue(frame);
AddToFrameQueue(frame, timeout_left);
}

if (timeout != boost::chrono::microseconds::max())
Expand Down
2 changes: 1 addition & 1 deletion src/SimpleAmqpClient/Version.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@

#define SIMPLEAMQPCLIENT_VERSION_MAJOR 2
#define SIMPLEAMQPCLIENT_VERSION_MINOR 5
#define SIMPLEAMQPCLIENT_VERSION_PATCH 0
#define SIMPLEAMQPCLIENT_VERSION_PATCH 1

#endif /* SIMPLEAMQPCLIENT_VERSION_H_ */