Skip to content

Commit

Permalink
last minute cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashi Ranjan committed Jun 24, 2015
1 parent 8f90c6d commit 6892b9c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ public class MessangerPool {
@Override
public void remove() {
Messenger messenger = MESSANGERPOOL.get();
logger.debug("Messenger removed: " + messenger.toString());
logger.debug("Messenger removed: " + messenger.toString() + " for thread: " + Thread.currentThread().getName());
messenger.terminate();
super.remove();
}

@Override
protected Messenger initialValue() {
Messenger messenger = new Messenger();
logger.debug("Messenger Created: " + messenger.toString());
return new Messenger();
logger.debug("Messenger Created: " + messenger.toString() + " for thread: " + Thread.currentThread().getName());
return messenger;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ a copy of this software and associated documentation files (the
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
*/

package com.mashape.analytics.agent.connection.pool;

Expand All @@ -39,38 +39,62 @@ a copy of this software and associated documentation files (the
import java.util.Map;

import org.apache.log4j.Logger;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

import com.google.gson.Gson;
import com.mashape.analytics.agent.modal.Creator;
import com.mashape.analytics.agent.modal.Entry;
import com.mashape.analytics.agent.modal.Har;
import com.mashape.analytics.agent.modal.Log;
import com.mashape.analytics.agent.modal.Message;

/*
* Opens a connection to Analytics server and sends data
*/
public class Messenger implements Executor {

private static Logger LOGGER = Logger.getLogger(Messenger.class);

private ZMQ.Context context;
private ZMQ.Socket socket;
private Context context;
private Socket socket;

public Messenger(){
context = ZMQ.context(1);
socket = context.socket(ZMQ.PUSH);
LOGGER.debug("Socket created: " + socket.toString());
public Messenger() {
context = ZMQ.context(1);
socket = context.socket(ZMQ.PUSH);
LOGGER.debug("Socket created: " + socket.toString());
}

public void execute(Map<String, Object> analyticsData) {
Message msg = getMessage(analyticsData);
String data = ALF_VERSION_PREFIX + new Gson().toJson(msg);
String analyticsServerUrl = analyticsData.get(ANALYTICS_SERVER_URL).toString();
String port = analyticsData.get(ANALYTICS_SERVER_PORT).toString();
socket.connect("tcp://" + analyticsServerUrl + ":" + port);
socket.send(data);
LOGGER.debug("Message sent:" + data);
int tryLeft = 3;
while (tryLeft > 0) {
try{
send(analyticsData);
break;
}catch (Exception e) {
if(tryLeft > 0){
socket.close();
socket = context.socket(ZMQ.PUSH);
LOGGER.error("Failed to send data, trying again:", e);
}else{
LOGGER.error("Failed to send data, dropping data", e);
}
tryLeft--;
}
}
}

private void send(Map<String, Object> analyticsData) {
Message msg = getMessage(analyticsData);
String data = ALF_VERSION_PREFIX + new Gson().toJson(msg);
String analyticsServerUrl = analyticsData.get(ANALYTICS_SERVER_URL).toString();
String port = analyticsData.get(ANALYTICS_SERVER_PORT).toString();
socket.connect("tcp://" + analyticsServerUrl + ":" + port);
socket.send(data);
LOGGER.debug("Message sent:" + data);
//socket.close();
}

public void terminate() {
Expand All @@ -80,7 +104,6 @@ public void terminate() {
}
if (context != null) {
context.close();
context.term();
}
}

Expand Down Expand Up @@ -116,11 +139,10 @@ private Creator setCreator() {
creator.setVersion(AGENT_VERSION);
return creator;
}

@Override
protected void finalize() throws Throwable {
protected void finalize(){
this.terminate();
LOGGER.debug("Messanger resources destroyed:"+ this.toString());
super.finalize();
LOGGER.debug("Messanger resources destroyed:" + this.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ a copy of this software and associated documentation files (the
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -64,8 +65,8 @@ a copy of this software and associated documentation files (the
import com.mashape.analytics.agent.wrapper.ResponseInterceptorWrapper;

/**
* AnalyticsFilter is a custom filter designed to intercept http request
* and response and send compiled data to Mashape analytics server.
* AnalyticsFilter is a custom filter designed to intercept http request and
* response and send compiled data to Mashape analytics server.
*
*/

Expand All @@ -85,7 +86,9 @@ public class AnalyticsFilter implements Filter {
public void destroy() {
try {
worker.shutdown();
worker.awaitTermination(30, TimeUnit.SECONDS);
while (!worker.awaitTermination(30, TimeUnit.SECONDS)) {
logger.debug("Waiting to theads to finish...");
}
} catch (InterruptedException e) {
logger.error("Error during shutdown of analytics pool", e);
}
Expand Down Expand Up @@ -148,6 +151,8 @@ private void callAsyncAnalytics(Date requestReceivedTime, RequestInterceptorWrap
messageProperties.put(CLIENT_IP_ADDRESS, request.getRemoteAddr());
messageProperties.put(ENVIRONMENT, environment);
worker.execute(new SendAnalyticsTask(messageProperties));
} catch (RejectedExecutionException e) {
logger.error("Queue is full, dropping the data", e);
} catch (Throwable x) {
logger.error("Failed to send analytics data", x);
}
Expand All @@ -165,10 +170,10 @@ public void init(FilterConfig config) throws ServletException {
logger.error("Analytics URl or Port or Token not set");
return;
}
int poolSize = getEnvVarOrDefault(WORKER_QUEUE_COUNT, 100);
int socketPoolMin = getEnvVarOrDefault(SOCKET_POOL_SIZE_MIN, 10);
int socketPoolMax = getEnvVarOrDefault(SOCKET_POOL_SIZE_MAX, 20);
int poolUpdateInterval = getEnvVarOrDefault(SOCKET_POOL_UPDATE_INTERVAL, 30);
int poolSize = getEnvVarOrDefault(WORKER_QUEUE_COUNT, 5000);
int socketPoolMin = getEnvVarOrDefault(SOCKET_POOL_SIZE_MIN, 2);
int socketPoolMax = getEnvVarOrDefault(SOCKET_POOL_SIZE_MAX, 4);
int poolUpdateInterval = getEnvVarOrDefault(SOCKET_POOL_UPDATE_INTERVAL, 20);
environment = getEnvironment();
blockingQueue = new LinkedBlockingQueue<Runnable>(poolSize);
worker = new ThreadPoolExecutor(socketPoolMin, socketPoolMax, poolUpdateInterval, TimeUnit.MILLISECONDS, blockingQueue);
Expand Down

0 comments on commit 6892b9c

Please sign in to comment.