Skip to content

Commit

Permalink
refactor thread aborting to use tasks and cancellation tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
hahn-kev committed Apr 11, 2024
1 parent 04e1e28 commit 759605e
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 248 deletions.
28 changes: 22 additions & 6 deletions src/ChorusHub/Advertiser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Chorus.ChorusHub;

namespace ChorusHub
{
public class Advertiser : IDisposable
{
private Thread _thread;
private CancellationTokenSource _cancellationTokenSource;
private UdpClient _client;
private IPEndPoint _endPoint;
private byte[] _sendBytes;
Expand All @@ -33,30 +35,44 @@ public void Start()
EnableBroadcast = true
};
_endPoint = new IPEndPoint(IPAddress.Parse("255.255.255.255"), Port);
_cancellationTokenSource = new CancellationTokenSource();
_thread = new Thread(Work);
_thread.Start();
}

private void Work()
{
bool cancelled = false;
try
{
while (true)
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
UpdateAdvertisementBasedOnCurrentIpAddress();
_client.BeginSend(_sendBytes, _sendBytes.Length, _endPoint, SendCallback, _client);
Thread.Sleep(1000);
_client.BeginSend(_sendBytes,
_sendBytes.Length,
_endPoint,
SendCallback,
_client);
Task.Delay(1000).Wait(_cancellationTokenSource.Token);
}
}
catch (OperationCanceledException)
{
cancelled = true;
}
catch(ThreadAbortException)
{
//Progress.WriteVerbose("Advertiser Thread Aborting (that's normal)");
_client.Close();
cancelled = true;
}
catch(Exception)
{
//EventLog.WriteEntry("Application", string.Format("Error in Advertiser: {0}", error.Message), EventLogEntryType.Error);
}

if (_cancellationTokenSource.Token.IsCancellationRequested && cancelled)
{
_client.Close();
}
}

public static void SendCallback(IAsyncResult args)
Expand Down Expand Up @@ -104,7 +120,7 @@ public void Stop()
return;

//EventLog.WriteEntry("Application", "Advertiser Stopping...", EventLogEntryType.Information);
_thread.Abort();
_cancellationTokenSource.Cancel();
_thread.Join(2 * 1000);
_thread = null;
}
Expand Down
125 changes: 0 additions & 125 deletions src/LibChorus/ProcessStream.cs

This file was deleted.

93 changes: 52 additions & 41 deletions src/LibChorus/Utilities/HgProcessOutputReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,47 +47,46 @@ public string StandardError
}

/// <summary>
/// Safely read the streams of the process
/// Safely read the streams of the process, must redirect stdErr, out and in before calling
/// </summary>
/// <returns>true if the process completed before the timeout or cancellation</returns>
public bool Read(ref Process process, int secondsBeforeTimeOut, IProgress progress)
{
var outputReaderArgs = new ReaderArgs() {Proc = process, Reader = process.StandardOutput};
if (process.StartInfo.RedirectStandardOutput)
{
_outputReader = new Thread(new ParameterizedThreadStart(ReadStream));
_outputReader.Start(outputReaderArgs);
}
var errorReaderArgs = new ReaderArgs() { Proc = process, Reader = process.StandardError };
if (process.StartInfo.RedirectStandardError)
{
_errorReader = new Thread(new ParameterizedThreadStart(ReadStream));
_errorReader.Start(errorReaderArgs);
}
var cts = new CancellationTokenSource();
var outputReaderArgs = new ReaderArgs(process, process.StandardOutput, cts.Token);
_outputReader = new Thread(ReadStream);
_outputReader.Start(outputReaderArgs);


var errorReaderArgs = new ReaderArgs(process, process.StandardError, cts.Token);
_errorReader = new Thread(ReadStream);
_errorReader.Start(errorReaderArgs);


lock(this)
{
_heartbeat = DateTime.Now;
}

//nb: at one point I (jh) tried adding !process.HasExited, but that made things less stable.
while (/*!process.HasExited &&*/ (_outputReader.ThreadState == ThreadState.Running || (_errorReader != null && _errorReader.ThreadState == ThreadState.Running)))
while ( _outputReader.ThreadState != ThreadState.Stopped && _errorReader.ThreadState != ThreadState.Stopped)
{
DateTime end;
lock (this)
{
end = _heartbeat.AddSeconds(secondsBeforeTimeOut);
}
if(progress.CancelRequested)

if (progress.CancelRequested)
{
cts.Cancel();
return false;
}

Thread.Sleep(100);
if (DateTime.Now > end)
{
if (_outputReader != null)
_outputReader.Abort();
if (_errorReader != null)
_errorReader.Abort();
cts.Cancel();
return false;
}
}
Expand Down Expand Up @@ -155,34 +154,46 @@ private bool HandleChangedVsDeletedFiles(string line, StreamWriter standardInput
private void ReadStream(object args)
{
var result = new StringBuilder();
var readerArgs = args as ReaderArgs;

var reader = readerArgs.Reader;
do
var readerArgs = (ReaderArgs)args;
try
{
var s = reader.ReadLine();
if (s != null)
{
// Eat up any heartbeat lines from the stream, also remove warnings about dotencode
if (s != Properties.Resources.MergeHeartbeat && s != DotEncodeWarning
var reader = readerArgs.Reader;
do
{
var s = reader.ReadLineAsync(readerArgs.Token).Result;
if (s == null) return; //cancelled
// Eat up any heartbeat lines from the stream, also remove warnings about dotencode
if (s != Properties.Resources.MergeHeartbeat
&& s != DotEncodeWarning
&& !HandleChangedVsDeletedFiles(s, readerArgs.Proc.StandardInput))
{
result.AppendLine(s.Trim());
}
lock (this)
{
// set the last heartbeat if data was read from the stream
_heartbeat = DateTime.Now;
}
}
} while (!reader.EndOfStream);// && !readerArgs.Proc.HasExited);

readerArgs.Results = result.ToString().Replace("\r\n", "\n");
{
result.AppendLine(s.Trim());
}

lock (this)
{
// set the last heartbeat if data was read from the stream
_heartbeat = DateTime.Now;
}
} while (!reader.EndOfStream && !readerArgs.Token.IsCancellationRequested); // && !readerArgs.Proc.HasExited);
}
finally
{
readerArgs.Results = result.ToString().Replace("\r\n", "\n");
}
}
}

class ReaderArgs
internal class ReaderArgs
{
public ReaderArgs(Process proc, StreamReader reader, CancellationToken token)
{
Token = token;
Reader = reader;
Proc = proc;
}

public CancellationToken Token;
public StreamReader Reader;
public Process Proc;
public string Results;
Expand Down
Loading

0 comments on commit 759605e

Please sign in to comment.