Skip to content

Refactor: extracted RabbitMQ node management functions from IntegrationFixture #884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 11 additions & 230 deletions projects/Unit/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,13 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;

using NUnit.Framework;

using RabbitMQ.Client.Framing;
using RabbitMQ.Client.Framing.Impl;
using static RabbitMQ.Client.Unit.RabbitMQCtl;

namespace RabbitMQ.Client.Unit
{
Expand All @@ -62,7 +58,6 @@ public class IntegrationFixture
internal IConnectionFactory ConnFactory;
internal IConnection Conn;
internal IModel Model;

internal Encoding encoding = new UTF8Encoding();
public static TimeSpan RECOVERY_INTERVAL = TimeSpan.FromSeconds(2);

Expand Down Expand Up @@ -420,276 +415,62 @@ internal void WaitOn(object o)
}
}

//
// Shelling Out
//

internal Process ExecRabbitMQCtl(string args)
{
// Allow the path to the rabbitmqctl.bat to be set per machine
string envVariable = Environment.GetEnvironmentVariable("RABBITMQ_RABBITMQCTL_PATH");
string rabbitmqctlPath;

if (envVariable != null)
{
var regex = new Regex(@"^DOCKER:(?<dockerMachine>.+)$");
Match match = regex.Match(envVariable);

if (match.Success)
{
return ExecRabbitMqCtlUsingDocker(args, match.Groups["dockerMachine"].Value);
} else {
rabbitmqctlPath = envVariable;
}
}
else
{
// provided by the umbrella
string umbrellaRabbitmqctlPath;
// provided in PATH by a RabbitMQ installation
string providedRabbitmqctlPath;

if (IsRunningOnMonoOrDotNetCore())
{
umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl";
providedRabbitmqctlPath = "rabbitmqctl";
} else {
umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat";
providedRabbitmqctlPath = "rabbitmqctl.bat";
}

if (File.Exists(umbrellaRabbitmqctlPath)) {
rabbitmqctlPath = umbrellaRabbitmqctlPath;
} else {
rabbitmqctlPath = providedRabbitmqctlPath;
}
}

return ExecCommand(rabbitmqctlPath, args);
}

private Process ExecRabbitMqCtlUsingDocker(string args, string dockerMachineName)
{
var proc = new Process
{
StartInfo =
{
CreateNoWindow = true,
UseShellExecute = false
}
};

try {
proc.StartInfo.FileName = "docker";
proc.StartInfo.Arguments = $"exec {dockerMachineName} rabbitmqctl {args}";
proc.StartInfo.RedirectStandardError = true;
proc.StartInfo.RedirectStandardOutput = true;

proc.Start();
string stderr = proc.StandardError.ReadToEnd();
proc.WaitForExit();
if (stderr.Length > 0 || proc.ExitCode > 0)
{
string stdout = proc.StandardOutput.ReadToEnd();
ReportExecFailure("rabbitmqctl", args, $"{stderr}\n{stdout}");
}

return proc;
}
catch (Exception e)
{
ReportExecFailure("rabbitmqctl", args, e.Message);
throw;
}
}

internal Process ExecCommand(string command)
{
return ExecCommand(command, "");
}

internal Process ExecCommand(string command, string args)
{
return ExecCommand(command, args, null);
}

internal Process ExecCommand(string ctl, string args, string changeDirTo)
{
var proc = new Process
{
StartInfo =
{
CreateNoWindow = true,
UseShellExecute = false
}
};
if(changeDirTo != null)
{
proc.StartInfo.WorkingDirectory = changeDirTo;
}

string cmd;
if(IsRunningOnMonoOrDotNetCore()) {
cmd = ctl;
} else {
cmd = "cmd.exe";
args = $"/c \"\"{ctl}\" {args}\"";
}

try {
proc.StartInfo.FileName = cmd;
proc.StartInfo.Arguments = args;
proc.StartInfo.RedirectStandardError = true;
proc.StartInfo.RedirectStandardOutput = true;

proc.Start();
string stderr = proc.StandardError.ReadToEnd();
proc.WaitForExit();
if (stderr.Length > 0 || proc.ExitCode > 0)
{
string stdout = proc.StandardOutput.ReadToEnd();
ReportExecFailure(cmd, args, $"{stderr}\n{stdout}");
}

return proc;
}
catch (Exception e)
{
ReportExecFailure(cmd, args, e.Message);
throw;
}
}

internal void ReportExecFailure(string cmd, string args, string msg)
{
Console.WriteLine($"Failure while running {cmd} {args}:\n{msg}");
}

public static bool IsRunningOnMonoOrDotNetCore()
{
#if NETCOREAPP
return true;
#else
return Type.GetType("Mono.Runtime") != null;
#endif
}

//
// Flow Control
//

internal void Block()
{
ExecRabbitMQCtl("set_vm_memory_high_watermark 0.000000001");
// give rabbitmqctl some time to do its job
Thread.Sleep(1200);
Publish(Conn);
RabbitMQCtl.Block(Conn, encoding);
}

internal void Unblock()
{
ExecRabbitMQCtl("set_vm_memory_high_watermark 0.4");
RabbitMQCtl.Unblock();
}

internal void Publish(IConnection conn)
{
IModel ch = conn.CreateModel();
ch.BasicPublish("amq.fanout", "", null, encoding.GetBytes("message"));
RabbitMQCtl.Publish(conn, encoding);
}

//
// Connection Closure
//

public class ConnectionInfo
{
public string Pid
{
get; set;
}

public string Name
{
get; set;
}

public ConnectionInfo(string pid, string name)
{
Pid = pid;
Name = name;
}

public override string ToString()
{
return $"pid = {Pid}, name: {Name}";
}
}

private static readonly Regex GetConnectionName = new Regex(@"\{""connection_name"",""(?<connection_name>[^""]+)""\}");

internal List<ConnectionInfo> ListConnections()
{
Process proc = ExecRabbitMQCtl("list_connections --silent pid client_properties");
string stdout = proc.StandardOutput.ReadToEnd();

try
{
// {Environment.NewLine} is not sufficient
string[] splitOn = new string[] { "\r\n", "\n" };
string[] lines = stdout.Split(splitOn, StringSplitOptions.RemoveEmptyEntries);
// line: <[email protected]> {.../*client_properties*/...}
return lines.Select(s =>
{
string[] columns = s.Split('\t');
Debug.Assert(!string.IsNullOrEmpty(columns[0]), "columns[0] is null or empty!");
Debug.Assert(!string.IsNullOrEmpty(columns[1]), "columns[1] is null or empty!");
Match match = GetConnectionName.Match(columns[1]);
Debug.Assert(match.Success, "columns[1] is not in expected format.");
return new ConnectionInfo(columns[0], match.Groups["connection_name"].Value);
}).ToList();
}
catch (Exception)
{
Console.WriteLine($"Bad response from rabbitmqctl list_connections --silent pid client_properties{Environment.NewLine}{stdout}");
throw;
}
return RabbitMQCtl.ListConnections();
}

internal void CloseConnection(IConnection conn)
{
ConnectionInfo ci = ListConnections().First(x => conn.ClientProvidedName == x.Name);
CloseConnection(ci.Pid);
RabbitMQCtl.CloseConnection(conn);
}

internal void CloseAllConnections()
{
List<ConnectionInfo> cs = ListConnections();
foreach(ConnectionInfo c in cs)
{
CloseConnection(c.Pid);
}
RabbitMQCtl.CloseAllConnections();
}

internal void CloseConnection(string pid)
{
ExecRabbitMQCtl($"close_connection \"{pid}\" \"Closed via rabbitmqctl\"");
RabbitMQCtl.CloseConnection(pid);
}

internal void RestartRabbitMQ()
{
StopRabbitMQ();
Thread.Sleep(500);
StartRabbitMQ();
RabbitMQCtl.RestartRabbitMQ();
}

internal void StopRabbitMQ()
{
ExecRabbitMQCtl("stop_app");
RabbitMQCtl.StopRabbitMQ();
}

internal void StartRabbitMQ()
{
ExecRabbitMQCtl("start_app");
RabbitMQCtl.StartRabbitMQ();
}

//
Expand Down
Loading