Skip to content

Commit 777eba0

Browse files
perghmichaelklishin
authored andcommitted
Refactor: Extracted RabbitMq server controller from IntegrationFixture
Simplifying IntegrationFixture. Make reuse of code for controlling RabbitMQ server simpler.
1 parent 29eb2ba commit 777eba0

File tree

1 file changed

+334
-0
lines changed

1 file changed

+334
-0
lines changed
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at https://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
#pragma warning disable 2002
42+
43+
using System;
44+
using System.Collections.Generic;
45+
using System.Diagnostics;
46+
using System.IO;
47+
using System.Linq;
48+
using System.Text;
49+
using System.Text.RegularExpressions;
50+
using System.Threading;
51+
52+
namespace RabbitMQ.Client.Unit
53+
{
54+
public static class RabbitMqServerController
55+
{
56+
//
57+
// Shelling Out
58+
//
59+
public static Process ExecRabbitMQCtl(string args)
60+
{
61+
// Allow the path to the rabbitmqctl.bat to be set per machine
62+
string envVariable = Environment.GetEnvironmentVariable("RABBITMQ_RABBITMQCTL_PATH");
63+
string rabbitmqctlPath;
64+
65+
if (envVariable != null)
66+
{
67+
var regex = new Regex(@"^DOCKER:(?<dockerMachine>.+)$");
68+
Match match = regex.Match(envVariable);
69+
70+
if (match.Success)
71+
{
72+
return ExecRabbitMqCtlUsingDocker(args, match.Groups["dockerMachine"].Value);
73+
}
74+
else
75+
{
76+
rabbitmqctlPath = envVariable;
77+
}
78+
}
79+
else
80+
{
81+
// provided by the umbrella
82+
string umbrellaRabbitmqctlPath;
83+
// provided in PATH by a RabbitMQ installation
84+
string providedRabbitmqctlPath;
85+
86+
if (IsRunningOnMonoOrDotNetCore())
87+
{
88+
umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl";
89+
providedRabbitmqctlPath = "rabbitmqctl";
90+
}
91+
else
92+
{
93+
umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat";
94+
providedRabbitmqctlPath = "rabbitmqctl.bat";
95+
}
96+
97+
if (File.Exists(umbrellaRabbitmqctlPath))
98+
{
99+
rabbitmqctlPath = umbrellaRabbitmqctlPath;
100+
}
101+
else
102+
{
103+
rabbitmqctlPath = providedRabbitmqctlPath;
104+
}
105+
}
106+
107+
return ExecCommand(rabbitmqctlPath, args);
108+
}
109+
110+
public static Process ExecRabbitMqCtlUsingDocker(string args, string dockerMachineName)
111+
{
112+
var proc = new Process
113+
{
114+
StartInfo =
115+
{
116+
CreateNoWindow = true,
117+
UseShellExecute = false
118+
}
119+
};
120+
121+
try
122+
{
123+
proc.StartInfo.FileName = "docker";
124+
proc.StartInfo.Arguments = $"exec {dockerMachineName} rabbitmqctl {args}";
125+
proc.StartInfo.RedirectStandardError = true;
126+
proc.StartInfo.RedirectStandardOutput = true;
127+
128+
proc.Start();
129+
string stderr = proc.StandardError.ReadToEnd();
130+
proc.WaitForExit();
131+
if (stderr.Length > 0 || proc.ExitCode > 0)
132+
{
133+
string stdout = proc.StandardOutput.ReadToEnd();
134+
ReportExecFailure("rabbitmqctl", args, $"{stderr}\n{stdout}");
135+
}
136+
137+
return proc;
138+
}
139+
catch (Exception e)
140+
{
141+
ReportExecFailure("rabbitmqctl", args, e.Message);
142+
throw;
143+
}
144+
}
145+
146+
public static Process ExecCommand(string command)
147+
{
148+
return ExecCommand(command, "");
149+
}
150+
151+
public static Process ExecCommand(string command, string args)
152+
{
153+
return ExecCommand(command, args, null);
154+
}
155+
156+
public static Process ExecCommand(string ctl, string args, string changeDirTo)
157+
{
158+
var proc = new Process
159+
{
160+
StartInfo =
161+
{
162+
CreateNoWindow = true,
163+
UseShellExecute = false
164+
}
165+
};
166+
if (changeDirTo != null)
167+
{
168+
proc.StartInfo.WorkingDirectory = changeDirTo;
169+
}
170+
171+
string cmd;
172+
if (IsRunningOnMonoOrDotNetCore())
173+
{
174+
cmd = ctl;
175+
}
176+
else
177+
{
178+
cmd = "cmd.exe";
179+
args = $"/c \"\"{ctl}\" {args}\"";
180+
}
181+
182+
try
183+
{
184+
proc.StartInfo.FileName = cmd;
185+
proc.StartInfo.Arguments = args;
186+
proc.StartInfo.RedirectStandardError = true;
187+
proc.StartInfo.RedirectStandardOutput = true;
188+
189+
proc.Start();
190+
string stderr = proc.StandardError.ReadToEnd();
191+
proc.WaitForExit();
192+
if (stderr.Length > 0 || proc.ExitCode > 0)
193+
{
194+
string stdout = proc.StandardOutput.ReadToEnd();
195+
ReportExecFailure(cmd, args, $"{stderr}\n{stdout}");
196+
}
197+
198+
return proc;
199+
}
200+
catch (Exception e)
201+
{
202+
ReportExecFailure(cmd, args, e.Message);
203+
throw;
204+
}
205+
}
206+
207+
public static void ReportExecFailure(string cmd, string args, string msg)
208+
{
209+
Console.WriteLine($"Failure while running {cmd} {args}:\n{msg}");
210+
}
211+
212+
public static bool IsRunningOnMonoOrDotNetCore()
213+
{
214+
#if NETCOREAPP
215+
return true;
216+
#else
217+
return Type.GetType("Mono.Runtime") != null;
218+
#endif
219+
}
220+
221+
//
222+
// Flow Control
223+
//
224+
225+
public static void Block(IConnection conn, Encoding encoding)
226+
{
227+
ExecRabbitMQCtl("set_vm_memory_high_watermark 0.000000001");
228+
// give rabbitmqctl some time to do its job
229+
Thread.Sleep(1200);
230+
Publish(conn, encoding);
231+
}
232+
233+
public static void Publish(IConnection conn, Encoding encoding)
234+
{
235+
IModel ch = conn.CreateModel();
236+
ch.BasicPublish("amq.fanout", "", null, encoding.GetBytes("message"));
237+
}
238+
239+
240+
public static void Unblock()
241+
{
242+
ExecRabbitMQCtl("set_vm_memory_high_watermark 0.4");
243+
}
244+
245+
private static readonly Regex GetConnectionName = new Regex(@"\{""connection_name"",""(?<connection_name>[^""]+)""\}");
246+
public class ConnectionInfo
247+
{
248+
public string Pid
249+
{
250+
get; set;
251+
}
252+
253+
public string Name
254+
{
255+
get; set;
256+
}
257+
258+
public ConnectionInfo(string pid, string name)
259+
{
260+
Pid = pid;
261+
Name = name;
262+
}
263+
264+
public override string ToString()
265+
{
266+
return $"pid = {Pid}, name: {Name}";
267+
}
268+
}
269+
public static List<ConnectionInfo> ListConnections()
270+
{
271+
Process proc = ExecRabbitMQCtl("list_connections --silent pid client_properties");
272+
string stdout = proc.StandardOutput.ReadToEnd();
273+
274+
try
275+
{
276+
// {Environment.NewLine} is not sufficient
277+
string[] splitOn = new string[] { "\r\n", "\n" };
278+
string[] lines = stdout.Split(splitOn, StringSplitOptions.RemoveEmptyEntries);
279+
// line: <[email protected]> {.../*client_properties*/...}
280+
return lines.Select(s =>
281+
{
282+
string[] columns = s.Split('\t');
283+
Debug.Assert(!string.IsNullOrEmpty(columns[0]), "columns[0] is null or empty!");
284+
Debug.Assert(!string.IsNullOrEmpty(columns[1]), "columns[1] is null or empty!");
285+
Match match = GetConnectionName.Match(columns[1]);
286+
Debug.Assert(match.Success, "columns[1] is not in expected format.");
287+
return new ConnectionInfo(columns[0], match.Groups["connection_name"].Value);
288+
}).ToList();
289+
}
290+
catch (Exception)
291+
{
292+
Console.WriteLine($"Bad response from rabbitmqctl list_connections --silent pid client_properties{Environment.NewLine}{stdout}");
293+
throw;
294+
}
295+
}
296+
297+
public static void CloseConnection(IConnection conn)
298+
{
299+
ConnectionInfo ci = ListConnections().First(x => conn.ClientProvidedName == x.Name);
300+
CloseConnection(ci.Pid);
301+
}
302+
303+
public static void CloseAllConnections()
304+
{
305+
List<ConnectionInfo> cs = ListConnections();
306+
foreach (ConnectionInfo c in cs)
307+
{
308+
CloseConnection(c.Pid);
309+
}
310+
}
311+
312+
public static void CloseConnection(string pid)
313+
{
314+
ExecRabbitMQCtl($"close_connection \"{pid}\" \"Closed via rabbitmqctl\"");
315+
}
316+
317+
public static void RestartRabbitMQ()
318+
{
319+
StopRabbitMQ();
320+
Thread.Sleep(500);
321+
StartRabbitMQ();
322+
}
323+
324+
public static void StopRabbitMQ()
325+
{
326+
ExecRabbitMQCtl("stop_app");
327+
}
328+
329+
public static void StartRabbitMQ()
330+
{
331+
ExecRabbitMQCtl("start_app");
332+
}
333+
}
334+
}

0 commit comments

Comments
 (0)