Skip to content

Commit

Permalink
[UNDERTOW-1794] improve DefaultAccessLogReceiver to obey contract!
Browse files Browse the repository at this point in the history
  • Loading branch information
baranowb committed Apr 27, 2021
1 parent 43244cc commit 8ba6d4e
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 91 deletions.
17 changes: 17 additions & 0 deletions core/src/main/java/io/undertow/UndertowLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,21 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String
@LogMessage(level = DEBUG)
@Message(id = 5095, value = "SSLEngine delegated task was rejected")
void sslEngineDelegatedTaskRejected(@Cause RejectedExecutionException ree);

@LogMessage(level = DEBUG)
@Message(id = 5096, value = "Access Log Worker failed to transition gracefuly.")
void accessLogWorkerFailureOnTransition();

@LogMessage(level = DEBUG)
@Message(id = 5097, value = "Access Log Worker failed to reschedule.")
void accessLogWorkerFailureOnReschedule();

@LogMessage(level = DEBUG)
@Message(id = 5098, value = "Access Log Worker did not terminate cleanly.")
void accessLogWorkerNoTermination();

@LogMessage(level = DEBUG)
@Message(id = 5099, value = "Interruption in close()")
void closeInterrupted(@Cause InterruptedException ie);

}
3 changes: 3 additions & 0 deletions core/src/main/java/io/undertow/UndertowMessages.java
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,7 @@ public interface UndertowMessages {

@Message(id = 198, value = "Blocking write timed out after %s nanoseconds.")
WriteTimeoutException blockingWriteTimedOut(long timeoutNanoseconds);

@Message(id = 199, value = "Failed to schedule access message. Access logger is closing.")
IllegalStateException failedToLogAccessOnClose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,16 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;

/**
* Log Receiver that stores logs in a directory under the specified file name, and rotates them after
Expand All @@ -59,7 +58,7 @@ public class DefaultAccessLogReceiver implements AccessLogReceiver, Runnable, Cl
//0 = not running
//1 = queued
//2 = running
//3 = final state of running (inside finally of run())
//3 = closing
@SuppressWarnings("unused")
private volatile int state = 0;

Expand Down Expand Up @@ -139,12 +138,14 @@ private void calculateChangeOverPoint() {

@Override
public void logMessage(final String message) {
if(closed) {
//Log handler is closing, other resources should as well, there shouldn't
//be resources served that required this to log stuff into AL file.
throw UndertowMessages.MESSAGES.failedToLogAccessOnClose();
}
this.pendingMessages.add(message);
int state = stateUpdater.get(this);
if (state == 0) {
if (stateUpdater.compareAndSet(this, 0, 1)) {
logWriteExecutor.execute(this);
}
if (stateUpdater.compareAndSet(this, 0, 1)) {
logWriteExecutor.execute(this);
}
}

Expand All @@ -153,75 +154,66 @@ public void logMessage(final String message) {
*/
@Override
public void run() {
//check if we can transition to 2. If so, perform tasks in small chunks and check this.closed.
//move into 3 if(this.closed) and terminate run()
if (!stateUpdater.compareAndSet(this, 1, 2)) {
return;
}
if (forceLogRotation) {
doRotate();
//NOTE: once we are here, run() control state transition, unless it is too slow
//and close takes over after grace period.

if (forceLogRotation || System.currentTimeMillis() > changeOverPoint) {
performFileRotation();
} else if (initialRun && Files.exists(defaultLogFile)) {
//if there is an existing log file check if it should be rotated
long lm = 0;
try {
lm = Files.getLastModifiedTime(defaultLogFile).toMillis();
} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorRotatingAccessLog(e);
}
Calendar c = Calendar.getInstance();
c.setTimeInMillis(changeOverPoint);
c.add(Calendar.DATE, -1);
if (lm <= c.getTimeInMillis()) {
doRotate();
}
checkAndRotateOnInitialRun();
}
initialRun = false;
List<String> messages = new ArrayList<>();
String msg;
//only grab at most 1000 messages at a time
for (int i = 0; i < 1000; ++i) {
msg = pendingMessages.poll();
if (msg == null) {
break;
if(closed) {
if (!stateUpdater.compareAndSet(this, 2, 3)) {
UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnTransition();
}
messages.add(msg);
return;
}

//only grab at most 1000 messages at a time
try {
if (!messages.isEmpty()) {
writeMessage(messages);
}
} finally {
// change this state to final state
stateUpdater.set(this, 3);
//check to see if there is still more messages
//if so then run this again
if (!pendingMessages.isEmpty() || forceLogRotation) {
if (stateUpdater.compareAndSet(this, 3, 1)) {
logWriteExecutor.execute(this);
for (int i = 0; i < 1000 && !this.closed && !pendingMessages.isEmpty(); ++i) {
final String msg = pendingMessages.peek();
if (msg == null) {
break;
}
writeMessage(msg);

// NOTE:this is very similar to remove(), but without screenNull
// at best, it will work like poll/remove, at worst, will do nothing
if (!pendingMessages.remove(msg)) {
break;
}
}
}finally {
// flush what we might have
try {
//this can happen when log has been rotated and there were no write
final BufferedWriter bw = this.writer;
if(bw != null)
bw.flush();
} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
}
// Check the state before resetting the state to 0 (not running) and checking if a writer needs to be closed:
// - If state != 3 here, another thread is executing this.
// The other thread will visit here and will check if a writer needs to be closed.
// We can leave state and skip closing a writer.
// - If state == 3 here, there is no another thread executing this.
// So, update the state to 0 (not running) and check if a writer needs be closed.
if (stateUpdater.compareAndSet(this, 3, 0) && closed) {
// As close() can be invoked from another thread in parallel,
// it will dispatch a new thread to close writer if state == 0 (not running) at that moment.
// So, just in case, check the state again:
// - if state != 0, another thread has already dispatched from close() and it will visit here. So, closing writer can be skipped here.
// - if state == 0, writer can be closed here. Let's change state to 3 again in order to prevent close() from dispatching a new thread.
if (stateUpdater.compareAndSet(this, 0, 3)) {
try {
if(writer != null) {
writer.flush();
writer.close();
writer = null;
}
} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
} finally {
// reset the state to 0 again finally
stateUpdater.set(this, 0);
if(this.closed) {
if (!stateUpdater.compareAndSet(this, 2, 3)) {
UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnTransition();
}
return;
} else {
if (!pendingMessages.isEmpty() || forceLogRotation) {
if (stateUpdater.compareAndSet(this, 2, 1)) {
logWriteExecutor.execute(this);
} else {
UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnReschedule();
}
} else {
if (!stateUpdater.compareAndSet(this, 2, 0)) {
UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnTransition();
}
}
}
Expand All @@ -243,43 +235,73 @@ void awaitWrittenForTest() throws InterruptedException {
}
}

private void writeMessage(final List<String> messages) {
if (System.currentTimeMillis() > changeOverPoint) {
doRotate();
private void writeMessage(final String message) {
//NOTE: is there a need to rotate on write?
//if (System.currentTimeMillis() > changeOverPoint) {
// performFileRotation();
//}
try {
if(!initOutput()) {
return;
}
final BufferedWriter bw = this.writer;
if(bw != null){
bw.write(message);
bw.newLine();
}
return;
} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
}
}

private boolean initOutput() {
try {
if (writer == null) {
boolean created = !Files.exists(defaultLogFile);
writer = Files.newBufferedWriter(defaultLogFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
if(Files.size(defaultLogFile) == 0 && fileHeaderGenerator != null) {
if (this.writer == null) {
this.writer = Files.newBufferedWriter(defaultLogFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND,
StandardOpenOption.CREATE);
if (Files.size(defaultLogFile) == 0 && fileHeaderGenerator != null) {
String header = fileHeaderGenerator.generateHeader();
if(header != null) {
writer.write(header);
writer.newLine();
writer.flush();
if (header != null) {
this.writer.write(header);
this.writer.newLine();
this.writer.flush();
}
}
}
for (String message : messages) {
writer.write(message);
writer.newLine();
}
writer.flush();
return true;
} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
return false;
}
}
private void checkAndRotateOnInitialRun() {
//if there is an existing log file check if it should be rotated
long lm = 0;
try {
lm = Files.getLastModifiedTime(defaultLogFile).toMillis();
} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorRotatingAccessLog(e);
}
Calendar c = Calendar.getInstance();
c.setTimeInMillis(changeOverPoint);
c.add(Calendar.DATE, -1);
if (lm <= c.getTimeInMillis()) {
performFileRotation();
}
initialRun = false;
}

private void doRotate() {
private void performFileRotation() {
forceLogRotation = false;
if (!rotate) {
return;
}
try {
if (writer != null) {
writer.flush();
writer.close();
writer = null;
if (this.writer != null) {
this.writer.flush();
this.writer.close();
this.writer = null;
}
if (!Files.exists(defaultLogFile)) {
return;
Expand Down Expand Up @@ -309,11 +331,64 @@ public void rotate() {
}
}

@SuppressWarnings("static-access")
@Override
public void close() throws IOException {
closed = true;
if (stateUpdater.compareAndSet(this, 0, 1)) {
logWriteExecutor.execute(this);
synchronized (this) {
if(this.closed) {
return;
}
this.closed = true;
}
if (this.stateUpdater.compareAndSet(this, 0, 3)) {
flushAndTerminate();
return;
} else {
// state[1,2] - scheduled or running, attempt schedule hijack
if (this.stateUpdater.compareAndSet(this, 1, 3)) {
//this means this thread raced against scheduled run(). run() will exist ASAP
//as 1->2 wont be possible, we are at 3 and this.closed
flushAndTerminate();
return;
}
// either failed race to 1->3 or we were in 2. We have to wait here sometime.
// wait ~1s, if situation does not clear up, try dumping stuff
for(int i=0; i<20;i++) {
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException e) {
UndertowLogger.ROOT_LOGGER.closeInterrupted(e);
break;
}
if(this.stateUpdater.get(this) == 3) {
break;
}
}
this.stateUpdater.set(this, 3);
flushAndTerminate();
}
}

protected void flushAndTerminate() {
try {
while (!this.pendingMessages.isEmpty()) {
final String msg = this.pendingMessages.poll();
// TODO: clarify this, how is this possible?
if (msg == null) {
continue;
}
writeMessage(msg);
}

this.writer.flush();
this.writer.close();
this.writer = null;

} catch (IOException e) {
UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
} finally {
//NOTE: no need, it cant be reused?
//stateUpdater.set(this, 0);
}
}

Expand Down

0 comments on commit 8ba6d4e

Please sign in to comment.