Skip to content

Commit

Permalink
adding more features to wdt uri
Browse files Browse the repository at this point in the history
Summary: Adding hostname:port, support for ipv6, and url shortening if ports are continous

Reviewed By: @​@ldemailly

Differential Revision: D2454584
  • Loading branch information
nikunjy authored and ldemailly committed Sep 22, 2015
1 parent ec60983 commit a4f8512
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 26 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 3.2)
# There is no C per se in WDT but if you use CXX only here many checks fail
# Version is Major.Minor.YYMMDDX for up to 10 releases per day
# Minor currently is also the protocol version - has to match with Protocol.cpp
project("WDT" LANGUAGES C CXX VERSION 1.19.1509200)
project("WDT" LANGUAGES C CXX VERSION 1.19.1509220)

# On MacOS this requires the latest (master) CMake (and/or CMake 3.1.1/3.2)
set(CMAKE_CXX_STANDARD 11)
Expand Down
167 changes: 152 additions & 15 deletions WdtBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ void WdtUri::setHostName(const string& hostName) {
hostName_ = hostName;
}

void WdtUri::setPort(int32_t port) {
port_ = port;
}

void WdtUri::setQueryParam(const string& key, const string& value) {
queryParams_[key] = value;
}
Expand All @@ -32,8 +36,17 @@ ErrorCode WdtUri::getErrorCode() const {
}

string WdtUri::generateUrl() const {
string url;
folly::toAppend(WDT_URL_PREFIX, hostName_, &url);
string url = WDT_URL_PREFIX;
if (hostName_.find(':') != string::npos) {
// Enclosing ipv6 address by [] so that it can be escaped
folly::toAppend('[', hostName_, ']', &url);
} else {
folly::toAppend(hostName_, &url);
}
if (port_ > 0) {
// Must have positive port value
folly::toAppend(":", port_, &url);
}
char prefix = '?';
for (const auto& pair : queryParams_) {
folly::toAppend(prefix, pair.first, "=", pair.second, &url);
Expand All @@ -54,17 +67,72 @@ ErrorCode WdtUri::process(const string& url) {
return URI_PARSE_ERROR;
}
urlPiece = StringPiece(url, WDT_URL_PREFIX.size());
size_t paramsIndex = urlPiece.find("?");
if (paramsIndex == string::npos) {
paramsIndex = urlPiece.size();
if (urlPiece.empty()) {
LOG(ERROR) << "Empty host name " << url;
return URI_PARSE_ERROR;;
}
ErrorCode status = OK;
hostName_.assign(urlPiece.data(), paramsIndex);
if (hostName_.size() == 0) {
LOG(ERROR) << "URL doesn't have a valid host name " << url;
// Parse hot name
if (urlPiece[0] == '[') {
urlPiece.advance(1);
size_t hostNameEnd = urlPiece.find(']');
if (hostNameEnd == string::npos) {
LOG(ERROR) << "Didn't find ] for ipv6 address " << url;
return URI_PARSE_ERROR;
}
hostName_.assign(urlPiece.data(), 0, hostNameEnd);
urlPiece.advance(hostNameEnd + 1);
} else {
size_t urlIndex = 0;
for (;urlIndex < urlPiece.size(); ++urlIndex) {
if (urlPiece[urlIndex] == ':') {
break;
}
if (urlPiece[urlIndex] == '?') {
break;
}
}
hostName_.assign(urlPiece.data(), 0, urlIndex);
urlPiece.advance(urlIndex);
}

if (hostName_.empty()) {
status = URI_PARSE_ERROR;
LOG(ERROR) << "Empty hostname " << url;
}

if (urlPiece.empty()) {
return status;
}

// parse port number
if (urlPiece[0] == ':') {
urlPiece.advance(1);
size_t paramsIndex = urlPiece.find('?');
if (paramsIndex == string::npos) {
paramsIndex = urlPiece.size();
}
try {
string portStr;
portStr.assign(urlPiece.data(), 0, paramsIndex);
port_ = folly::to<int32_t>(portStr);
} catch(std::exception& e) {
LOG(ERROR) << "Invalid port, can't be parsed " << url;
status = URI_PARSE_ERROR;
}
urlPiece.advance(paramsIndex);
}

if (urlPiece.empty()) {
return status;;
}

if (urlPiece[0] != '?') {
LOG(ERROR) << "Unexpected delimiter for params " << urlPiece[0];
return URI_PARSE_ERROR;
}
urlPiece.advance(paramsIndex + (paramsIndex < urlPiece.size()));
urlPiece.advance(1);
//parse params
while (!urlPiece.empty()) {
StringPiece keyValuePair = urlPiece.split_step('&');
if (keyValuePair.empty()) {
Expand All @@ -89,6 +157,10 @@ string WdtUri::getHostName() const {
return hostName_;
}

int32_t WdtUri::getPort() const {
return port_;
}

string WdtUri::getQueryParam(const string& key) const {
auto it = queryParams_.find(key);
if (it == queryParams_.end()) {
Expand All @@ -104,6 +176,7 @@ const unordered_map<string, string>& WdtUri::getQueryParams() const {

void WdtUri::clear() {
hostName_.clear();
port_ = -1;
queryParams_.clear();
}

Expand All @@ -115,12 +188,17 @@ WdtUri& WdtUri::operator=(const string& url) {

const string WdtTransferRequest::TRANSFER_ID_PARAM{"id"};
/** RECeiver's Protocol Version */
const string WdtTransferRequest::PROTOCOL_VERSION_PARAM{"recpv"};
const string WdtTransferRequest::RECEIVER_PROTOCOL_VERSION_PARAM{"recpv"};
const string WdtTransferRequest::DIRECTORY_PARAM{"dir"};
const string WdtTransferRequest::PORTS_PARAM{"ports"};
const string WdtTransferRequest::START_PORT_PARAM{"start_port"};
const string WdtTransferRequest::NUM_PORTS_PARAM{"num_ports"};

WdtTransferRequest::WdtTransferRequest(const vector<int32_t>& ports) {
this->ports = ports;
// Sort the ports so that if they are a sequence then you
// can detect that
sort(this->ports.begin(), this->ports.end());
}

WdtTransferRequest::WdtTransferRequest(int startPort, int numPorts,
Expand All @@ -142,11 +220,11 @@ WdtTransferRequest::WdtTransferRequest(const string& uriString) {
transferId = wdtUri.getQueryParam(TRANSFER_ID_PARAM);
directory = wdtUri.getQueryParam(DIRECTORY_PARAM);
try {
protocolVersion =
folly::to<int64_t>(wdtUri.getQueryParam(PROTOCOL_VERSION_PARAM));
protocolVersion = folly::to<int64_t>(
wdtUri.getQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM));
} catch (std::exception& e) {
LOG(ERROR) << "Error parsing protocol version "
<< wdtUri.getQueryParam(PROTOCOL_VERSION_PARAM);
<< wdtUri.getQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM);
errorCode = URI_PARSE_ERROR;
}
string portsStr(wdtUri.getQueryParam(PORTS_PARAM));
Expand All @@ -164,6 +242,33 @@ WdtTransferRequest::WdtTransferRequest(const string& uriString) {
}
}
} while (!portsList.empty());
if (!ports.empty()) {
return;
}
// Figure out ports using other params only if there was no port list
string startPortStr = wdtUri.getQueryParam(START_PORT_PARAM);
string numPortsStr = wdtUri.getQueryParam(NUM_PORTS_PARAM);
const auto& options = WdtOptions::get();
int32_t startPort = wdtUri.getPort();
if (startPort <= 0) {
startPort = options.start_port;
if (!startPortStr.empty()) {
try {
startPort = folly::to<int32_t>(startPortStr);
} catch (std::exception& e) {
LOG(ERROR) << "Couldn't convert start port " << startPortStr;
}
}
}
int numPorts = options.num_ports;
if (!numPortsStr.empty()) {
try {
numPorts = folly::to<int32_t>(numPortsStr);
} catch (std::exception& e) {
LOG(ERROR) << "Couldn't convert num ports " << numPortsStr;
}
}
ports = WdtBase::genPortsVector(startPort, numPorts);
}

string WdtTransferRequest::generateUrl(bool genFull) const {
Expand All @@ -174,15 +279,38 @@ string WdtTransferRequest::generateUrl(bool genFull) const {
WdtUri wdtUri;
wdtUri.setHostName(hostName);
wdtUri.setQueryParam(TRANSFER_ID_PARAM, transferId);
wdtUri.setQueryParam(PROTOCOL_VERSION_PARAM,
wdtUri.setQueryParam(RECEIVER_PROTOCOL_VERSION_PARAM,
folly::to<string>(protocolVersion));
wdtUri.setQueryParam(PORTS_PARAM, getSerializedPortsList());
serializePorts(wdtUri);
if (genFull) {
wdtUri.setQueryParam(DIRECTORY_PARAM, directory);
}
return wdtUri.generateUrl();
}

void WdtTransferRequest::serializePorts(WdtUri& wdtUri) const {
// Serialize to a port list if the ports are not
// contigous sequence else wdt://hostname:port
if (ports.size() == 0) {
return;
}
int32_t prevPort = ports[0];
bool hasHoles = false;
for (size_t i = 1; i < ports.size(); i++) {
if (ports[i] != prevPort + 1) {
hasHoles = true;
break;
}
prevPort = ports[i];
}
if (hasHoles) {
wdtUri.setQueryParam(PORTS_PARAM, getSerializedPortsList());
} else {
wdtUri.setPort(ports[0]);
wdtUri.setQueryParam(NUM_PORTS_PARAM, folly::to<string>(ports.size()));
}
}

string WdtTransferRequest::getSerializedPortsList() const {
string portsList = "";
for (size_t i = 0; i < ports.size(); i++) {
Expand Down Expand Up @@ -214,6 +342,15 @@ WdtBase::~WdtBase() {
abortChecker_ = nullptr;
}

std::vector<int32_t> WdtBase::genPortsVector(int32_t startPort,
int32_t numPorts) {
std::vector<int32_t> ports;
for (int32_t i = 0; i < numPorts; i++) {
ports.push_back(startPort + i);
}
return ports;
}

void WdtBase::abort(const ErrorCode abortCode) {
folly::RWSpinLock::WriteHolder guard(abortCodeLock_);
if (abortCode == VERSION_MISMATCH && abortCode_ != OK) {
Expand Down
23 changes: 21 additions & 2 deletions WdtBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class WdtUri {
/// Get the host name of the url
std::string getHostName() const;

/// Get the port number
int32_t getPort() const;

/// Get the query param by key
std::string getQueryParam(const std::string& key) const;

Expand All @@ -54,6 +57,9 @@ class WdtUri {
/// Sets hostname to generate a url
void setHostName(const std::string& hostName);

/// Set the port for the uri
void setPort(int32_t port);

/// Sets a query param in the query params map
void setQueryParam(const std::string& key, const std::string& value);

Expand Down Expand Up @@ -85,9 +91,12 @@ class WdtUri {
/// Prefix of the wdt url
const std::string WDT_URL_PREFIX{"wdt://"};

/// Hostname where the receiver is running
/// Hostname/ip address in the uri
std::string hostName_{""};

/// Port of the uri
int32_t port_{-1};

/// Error code that reflects that status of parsing url
ErrorCode errorCode_{OK};
};
Expand Down Expand Up @@ -137,6 +146,9 @@ struct WdtTransferRequest {
/// Serialize this structure into a url string containing all fields
std::string generateUrl(bool genFull = false) const;

/// Serialize the ports into uri
void serializePorts(WdtUri& wdtUri) const;

/// Get stringified port list
std::string getSerializedPortsList() const;

Expand All @@ -145,9 +157,12 @@ struct WdtTransferRequest {

/// Names of the get parameters for different fields
const static std::string TRANSFER_ID_PARAM;
const static std::string PROTOCOL_VERSION_PARAM;
/** Constant for for the protocol version get parameter in uri */
const static std::string RECEIVER_PROTOCOL_VERSION_PARAM;
const static std::string DIRECTORY_PARAM;
const static std::string PORTS_PARAM;
const static std::string START_PORT_PARAM;
const static std::string NUM_PORTS_PARAM;
};

/**
Expand All @@ -159,6 +174,10 @@ class WdtBase {
/// Constructor
WdtBase();

/// Get ports vector from startPort and numPorts
static std::vector<int32_t> genPortsVector(int32_t startPort,
int32_t numPorts);

/**
* Does the setup before start, returns the transfer request
* that corresponds to the information relating to the sender
Expand Down
4 changes: 2 additions & 2 deletions WdtConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

#define WDT_VERSION_MAJOR 1
#define WDT_VERSION_MINOR 19
#define WDT_VERSION_BUILD 1509200
#define WDT_VERSION_BUILD 1509220
// Add -fbcode to version str
#define WDT_VERSION_STR "1.19.1509200-fbcode"
#define WDT_VERSION_STR "1.19.1509220-fbcode"
// Tie minor and proto version
#define WDT_PROTOCOL_VERSION WDT_VERSION_MINOR

Expand Down
Loading

0 comments on commit a4f8512

Please sign in to comment.