32
32
#pragma warning disable 2002
33
33
34
34
using System ;
35
- using System . Collections . Generic ;
36
35
using System . Diagnostics ;
37
36
using System . IO ;
38
- using System . Linq ;
39
37
using System . Text ;
40
38
using System . Text . RegularExpressions ;
41
39
using System . Threading ;
42
40
43
41
namespace RabbitMQ . Client . Unit
44
42
{
43
+ #nullable enable
45
44
public static class RabbitMQCtl
46
45
{
47
- //
48
- // Shelling Out
49
- //
50
- private static Process ExecRabbitMQCtl ( string args )
46
+ private static readonly Func < string , Process > s_invokeRabbitMqCtl = GetRabbitMqCtlInvokeAction ( ) ;
47
+
48
+ private static Func < string , Process > GetRabbitMqCtlInvokeAction ( )
51
49
{
52
- // Allow the path to the rabbitmqctl.bat to be set per machine
53
- string envVariable = Environment . GetEnvironmentVariable ( "RABBITMQ_RABBITMQCTL_PATH" ) ;
54
- string rabbitmqctlPath ;
50
+ string precomputedArguments ;
51
+ string ? envVariable = Environment . GetEnvironmentVariable ( "RABBITMQ_RABBITMQCTL_PATH" ) ;
55
52
56
- if ( envVariable != null )
53
+ if ( envVariable is not null )
57
54
{
58
- var regex = new Regex ( @"^DOCKER:(?<dockerMachine>.+)$" ) ;
59
- Match match = regex . Match ( envVariable ) ;
60
-
61
- if ( match . Success )
55
+ const string DockerPrefix = "DOCKER:" ;
56
+ if ( envVariable . StartsWith ( DockerPrefix ) )
62
57
{
63
- return ExecRabbitMQCtlUsingDocker ( args , match . Groups [ "dockerMachine" ] . Value ) ;
64
- }
65
- else
66
- {
67
- rabbitmqctlPath = envVariable ;
58
+ // Call docker
59
+ precomputedArguments = $ "exec { envVariable . Substring ( DockerPrefix . Length ) } rabbitmqctl ";
60
+ return args => CreateProcess ( "docker" , precomputedArguments + args ) ;
68
61
}
62
+
63
+ // call the path from the env var
64
+ return args => CreateProcess ( envVariable , args ) ;
65
+ }
66
+
67
+ // Try default
68
+ string umbrellaRabbitmqctlPath ;
69
+ string providedRabbitmqctlPath ;
70
+
71
+ if ( IsRunningOnMonoOrDotNetCore ( ) )
72
+ {
73
+ umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl" ;
74
+ providedRabbitmqctlPath = "rabbitmqctl" ;
69
75
}
70
76
else
71
77
{
72
- // provided by the umbrella
73
- string umbrellaRabbitmqctlPath ;
74
- // provided in PATH by a RabbitMQ installation
75
- string providedRabbitmqctlPath ;
78
+ umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat" ;
79
+ providedRabbitmqctlPath = "rabbitmqctl.bat" ;
80
+ }
76
81
77
- if ( IsRunningOnMonoOrDotNetCore ( ) )
78
- {
79
- umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl" ;
80
- providedRabbitmqctlPath = "rabbitmqctl" ;
81
- }
82
- else
83
- {
84
- umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat" ;
85
- providedRabbitmqctlPath = "rabbitmqctl.bat" ;
86
- }
82
+ string path = File . Exists ( umbrellaRabbitmqctlPath ) ? umbrellaRabbitmqctlPath : providedRabbitmqctlPath ;
87
83
88
- if ( File . Exists ( umbrellaRabbitmqctlPath ) )
89
- {
90
- rabbitmqctlPath = umbrellaRabbitmqctlPath ;
91
- }
92
- else
93
- {
94
- rabbitmqctlPath = providedRabbitmqctlPath ;
95
- }
84
+ if ( IsRunningOnMonoOrDotNetCore ( ) )
85
+ {
86
+ return args => CreateProcess ( path , args ) ;
96
87
}
97
88
98
- return ExecCommand ( rabbitmqctlPath , args ) ;
89
+ precomputedArguments = $ "/c \" \" { path } \" ";
90
+ return args => CreateProcess ( "cmd.exe" , precomputedArguments + args ) ;
99
91
}
100
92
101
- private static Process ExecRabbitMQCtlUsingDocker ( string args , string dockerMachineName )
93
+ //
94
+ // Shelling Out
95
+ //
96
+ private static string ExecRabbitMQCtl ( string args )
102
97
{
103
- var proc = new Process
104
- {
105
- StartInfo =
106
- {
107
- CreateNoWindow = true ,
108
- UseShellExecute = false
109
- }
110
- } ;
111
-
112
98
try
113
99
{
114
- proc . StartInfo . FileName = "docker" ;
115
- proc . StartInfo . Arguments = $ "exec { dockerMachineName } rabbitmqctl { args } ";
116
- proc . StartInfo . RedirectStandardError = true ;
117
- proc . StartInfo . RedirectStandardOutput = true ;
100
+ using var process = s_invokeRabbitMqCtl ( args ) ;
101
+ process . Start ( ) ;
102
+ process . WaitForExit ( ) ;
103
+ string stderr = process . StandardError . ReadToEnd ( ) ;
104
+ string stdout = process . StandardOutput . ReadToEnd ( ) ;
118
105
119
- proc . Start ( ) ;
120
- string stderr = proc . StandardError . ReadToEnd ( ) ;
121
- proc . WaitForExit ( ) ;
122
- if ( stderr . Length > 0 || proc . ExitCode > 0 )
106
+ if ( stderr . Length > 0 || process . ExitCode > 0 )
123
107
{
124
- string stdout = proc . StandardOutput . ReadToEnd ( ) ;
125
108
ReportExecFailure ( "rabbitmqctl" , args , $ "{ stderr } \n { stdout } ") ;
126
109
}
127
110
128
- return proc ;
111
+ return stdout ;
129
112
}
130
113
catch ( Exception e )
131
114
{
@@ -134,60 +117,21 @@ private static Process ExecRabbitMQCtlUsingDocker(string args, string dockerMach
134
117
}
135
118
}
136
119
137
- private static Process ExecCommand ( string command , string args )
138
- {
139
- return ExecCommand ( command , args , null ) ;
140
- }
141
-
142
- private static Process ExecCommand ( string ctl , string args , string changeDirTo )
120
+ private static Process CreateProcess ( string cmd , string arguments , string ? workDirectory = null )
143
121
{
144
- var proc = new Process
122
+ return new Process
145
123
{
146
124
StartInfo =
147
125
{
148
126
CreateNoWindow = true ,
149
- UseShellExecute = false
127
+ UseShellExecute = false ,
128
+ RedirectStandardError = true ,
129
+ RedirectStandardOutput = true ,
130
+ FileName = cmd ,
131
+ Arguments = arguments ,
132
+ WorkingDirectory = workDirectory
150
133
}
151
134
} ;
152
- if ( changeDirTo != null )
153
- {
154
- proc . StartInfo . WorkingDirectory = changeDirTo ;
155
- }
156
-
157
- string cmd ;
158
- if ( IsRunningOnMonoOrDotNetCore ( ) )
159
- {
160
- cmd = ctl ;
161
- }
162
- else
163
- {
164
- cmd = "cmd.exe" ;
165
- args = $ "/c \" \" { ctl } \" { args } \" ";
166
- }
167
-
168
- try
169
- {
170
- proc . StartInfo . FileName = cmd ;
171
- proc . StartInfo . Arguments = args ;
172
- proc . StartInfo . RedirectStandardError = true ;
173
- proc . StartInfo . RedirectStandardOutput = true ;
174
-
175
- proc . Start ( ) ;
176
- string stderr = proc . StandardError . ReadToEnd ( ) ;
177
- proc . WaitForExit ( ) ;
178
- if ( stderr . Length > 0 || proc . ExitCode > 0 )
179
- {
180
- string stdout = proc . StandardOutput . ReadToEnd ( ) ;
181
- ReportExecFailure ( cmd , args , $ "{ stderr } \n { stdout } ") ;
182
- }
183
-
184
- return proc ;
185
- }
186
- catch ( Exception e )
187
- {
188
- ReportExecFailure ( cmd , args , e . Message ) ;
189
- throw ;
190
- }
191
135
}
192
136
193
137
private static void ReportExecFailure ( string cmd , string args , string msg )
@@ -207,7 +151,6 @@ private static bool IsRunningOnMonoOrDotNetCore()
207
151
//
208
152
// Flow Control
209
153
//
210
-
211
154
public static void Block ( IConnection conn , Encoding encoding )
212
155
{
213
156
ExecRabbitMQCtl ( "set_vm_memory_high_watermark 0.000000001" ) ;
@@ -222,88 +165,51 @@ public static void Publish(IConnection conn, Encoding encoding)
222
165
ch . BasicPublish ( "amq.fanout" , "" , encoding . GetBytes ( "message" ) ) ;
223
166
}
224
167
225
-
226
168
public static void Unblock ( )
227
169
{
228
170
ExecRabbitMQCtl ( "set_vm_memory_high_watermark 0.4" ) ;
229
171
}
230
172
231
- private static readonly Regex s_getConnectionProperties = new Regex ( @"(?<pid>.*)\s\[.*\""connection_name\"",\""(?<connection_name>.*?)\"".*\]" , RegexOptions . Compiled ) ;
232
- public class ConnectionInfo
173
+ public static void CloseConnection ( IConnection conn )
233
174
{
234
- public string Pid
235
- {
236
- get ; set ;
237
- }
238
-
239
- public string Name
240
- {
241
- get ; set ;
242
- }
243
-
244
- public ConnectionInfo ( string pid , string name )
245
- {
246
- Pid = pid ;
247
- Name = name ;
248
- }
249
-
250
- public override string ToString ( )
251
- {
252
- return $ "pid = { Pid } , name: { Name } ";
253
- }
175
+ CloseConnection ( GetConnectionPid ( conn . ClientProvidedName ) ) ;
254
176
}
255
177
256
- public static List < ConnectionInfo > ListConnections ( )
178
+ private static readonly Regex s_getConnectionProperties = new Regex ( @"^(?<pid><[^>]*>)\s\[.*""connection_name"",""(?<connection_name>[^""]*)"".*\]$" , RegexOptions . Multiline | RegexOptions . Compiled ) ;
179
+ private static string GetConnectionPid ( string connectionName )
257
180
{
258
- Process proc = ExecRabbitMQCtl ( "list_connections --silent pid client_properties" ) ;
259
- string stdout = proc . StandardOutput . ReadToEnd ( ) ;
181
+ string stdout = ExecRabbitMQCtl ( "list_connections --silent pid client_properties" ) ;
260
182
261
- try
183
+ var match = s_getConnectionProperties . Match ( stdout ) ;
184
+ while ( match . Success )
262
185
{
263
- // {Environment.NewLine} is not sufficient
264
- var matches = s_getConnectionProperties . Matches ( stdout ) ;
265
- if ( matches . Count > 0 )
186
+ if ( match . Groups [ "connection_name" ] . Value == connectionName )
266
187
{
267
- var list = new List < ConnectionInfo > ( matches . Count ) ;
268
- for ( int i = 0 ; i < matches . Count ; i ++ )
269
- {
270
- var s = matches [ i ] ;
271
- Debug . Assert ( s . Success , "Unable to parse connection list." ) ;
272
- Debug . Assert ( s . Groups . ContainsKey ( "pid" ) , "Unable to parse pid from {stdout}" ) ;
273
- Debug . Assert ( s . Groups . ContainsKey ( "connection_name" ) , "Unable to parse connection_name from {stdout}" ) ;
274
- list . Add ( new ConnectionInfo ( s . Groups [ "pid" ] . Value , s . Groups [ "connection_name" ] . Value ) ) ;
275
- }
276
-
277
- return list ;
188
+ return match . Groups [ "pid" ] . Value ;
278
189
}
279
190
280
- return null ;
281
- }
282
- catch ( Exception )
283
- {
284
- Console . WriteLine ( $ "Bad response from rabbitmqctl list_connections --silent pid client_properties{ Environment . NewLine } { stdout } ") ;
285
- throw ;
191
+ match = match . NextMatch ( ) ;
286
192
}
193
+
194
+ throw new Exception ( $ "No connection found with name: { connectionName } ") ;
287
195
}
288
196
289
- public static void CloseConnection ( IConnection conn )
197
+ private static void CloseConnection ( string pid )
290
198
{
291
- ConnectionInfo ci = ListConnections ( ) . First ( x => conn . ClientProvidedName == x . Name ) ;
292
- CloseConnection ( ci . Pid ) ;
199
+ ExecRabbitMQCtl ( $ "close_connection \" { pid } \" \" Closed via rabbitmqctl\" ") ;
293
200
}
294
201
295
202
public static void CloseAllConnections ( )
296
203
{
297
- List < ConnectionInfo > cs = ListConnections ( ) ;
298
- foreach ( ConnectionInfo c in cs )
204
+ foreach ( var pid in EnumerateConnectionsPid ( ) )
299
205
{
300
- CloseConnection ( c . Pid ) ;
206
+ CloseConnection ( pid ) ;
301
207
}
302
208
}
303
209
304
- public static void CloseConnection ( string pid )
210
+ private static string [ ] EnumerateConnectionsPid ( )
305
211
{
306
- ExecRabbitMQCtl ( $ "close_connection \" { pid } \" \" Closed via rabbitmqctl \" " ) ;
212
+ return ExecRabbitMQCtl ( "list_connections --silent pid" ) . Split ( ' \n ' , StringSplitOptions . RemoveEmptyEntries ) ;
307
213
}
308
214
309
215
public static void RestartRabbitMQ ( )
0 commit comments