11
11
12
12
class ConnTest (unittest2 .TestCase ):
13
13
def setUp (self ):
14
+ self .config = {
15
+ 'host' : 'localhost' ,
16
+ 'port' : 9090 ,
17
+ 'request_id' : 0 ,
18
+ 'payload' : 'test data'
19
+ }
20
+
14
21
# Mocking socket.create_connection will cause _sock to always be a
15
22
# MagicMock()
16
23
patcher = mock .patch ('socket.create_connection' , spec = True )
@@ -20,6 +27,9 @@ def setUp(self):
20
27
self .MockCreateConn ().sendall .return_value = None
21
28
self .addCleanup (patcher .stop )
22
29
30
+ self .conn = KafkaConnection (self .config ['host' ], self .config ['port' ])
31
+ socket .create_connection .reset_mock ()
32
+
23
33
def test_collect_hosts__happy_path (self ):
24
34
hosts = "localhost:1234,localhost"
25
35
results = collect_hosts (hosts )
@@ -52,168 +62,105 @@ def test_collect_hosts__with_spaces(self):
52
62
]))
53
63
54
64
def test_send (self ):
55
- fake_config = {
56
- 'host' : 'localhost' ,
57
- 'port' : 9090 ,
58
- 'request_id' : 0 ,
59
- 'payload' : 'test data'
60
- }
61
-
62
- assert socket .create_connection is self .MockCreateConn
63
- conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
64
- socket .create_connection .reset_mock ()
65
- conn .send (fake_config ['request_id' ], fake_config ['payload' ])
66
- conn ._sock .sendall .assert_called_with (fake_config ['payload' ])
65
+ self .conn .send (self .config ['request_id' ], self .config ['payload' ])
66
+ self .conn ._sock .sendall .assert_called_with (self .config ['payload' ])
67
67
68
68
def test_init_creates_socket_connection (self ):
69
- fake_config = {
70
- 'host' : 'localhost' ,
71
- 'port' : 9090 ,
72
- }
73
-
74
- assert socket .create_connection is self .MockCreateConn
75
- socket .create_connection .reset_mock ()
76
- KafkaConnection (fake_config ['host' ], fake_config ['port' ])
77
- socket .create_connection .assert_called_with ((fake_config ['host' ], fake_config ['port' ]), DEFAULT_SOCKET_TIMEOUT_SECONDS )
69
+ KafkaConnection (self .config ['host' ], self .config ['port' ])
70
+ socket .create_connection .assert_called_with ((self .config ['host' ], self .config ['port' ]), DEFAULT_SOCKET_TIMEOUT_SECONDS )
78
71
79
72
def test_init_failure_raises_connection_error (self ):
80
- fake_config = {
81
- 'host' : 'localhost' ,
82
- 'port' : 9090 ,
83
- }
84
73
85
74
def raise_error (* args ):
86
75
raise socket .error
87
76
88
77
assert socket .create_connection is self .MockCreateConn
89
78
socket .create_connection .side_effect = raise_error
90
79
with self .assertRaises (ConnectionError ):
91
- KafkaConnection (fake_config ['host' ], fake_config ['port' ])
80
+ KafkaConnection (self . config ['host' ], self . config ['port' ])
92
81
93
82
def test_send__reconnects_on_dirty_conn (self ):
94
- fake_config = {
95
- 'host' : 'localhost' ,
96
- 'port' : 9090 ,
97
- 'request_id' : 0 ,
98
- 'payload' : 'test data'
99
- }
100
83
101
- # Get a connection (with socket mocked)
102
- assert socket .create_connection is self .MockCreateConn
103
- conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
104
-
105
- # Dirty it
84
+ # Dirty the connection
85
+ assert self .conn ._dirty is False
106
86
try :
107
- conn ._raise_connection_error ()
87
+ self . conn ._raise_connection_error ()
108
88
except ConnectionError :
109
89
pass
110
-
111
- # Reset the socket call counts
112
- socket .create_connection .reset_mock ()
113
- self .assertEqual (socket .create_connection .call_count , 0 )
90
+ assert self .conn ._dirty is True
114
91
115
92
# Now test that sending attempts to reconnect
116
- conn .send (fake_config ['request_id' ], fake_config ['payload' ])
93
+ self .assertEqual (socket .create_connection .call_count , 0 )
94
+ self .conn .send (self .config ['request_id' ], self .config ['payload' ])
117
95
self .assertEqual (socket .create_connection .call_count , 1 )
118
96
119
97
# A second way to dirty it...
120
- conn .close ()
98
+ self . conn .close ()
121
99
122
100
# Reset the socket call counts
123
101
socket .create_connection .reset_mock ()
124
102
self .assertEqual (socket .create_connection .call_count , 0 )
125
103
126
104
# Now test that sending attempts to reconnect
127
- conn .send (fake_config ['request_id' ], fake_config ['payload' ])
105
+ self . conn .send (self . config ['request_id' ], self . config ['payload' ])
128
106
self .assertEqual (socket .create_connection .call_count , 1 )
129
107
130
-
131
108
def test_send__failure_sets_dirty_connection (self ):
132
- fake_config = {
133
- 'host' : 'localhost' ,
134
- 'port' : 9090 ,
135
- 'request_id' : 0 ,
136
- 'payload' : 'test data'
137
- }
138
109
139
110
def raise_error (* args ):
140
111
raise socket .error
141
112
142
- # Get a connection (with socket mocked)
143
- assert socket .create_connection is self .MockCreateConn
144
- conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
113
+ assert self .conn ._dirty is False
145
114
146
- assert isinstance (conn ._sock , mock .Mock )
147
- conn ._sock .sendall .side_effect = raise_error
115
+ assert isinstance (self . conn ._sock , mock .Mock )
116
+ self . conn ._sock .sendall .side_effect = raise_error
148
117
try :
149
- conn .send (fake_config ['request_id' ], fake_config ['payload' ])
118
+ self . conn .send (self . config ['request_id' ], self . config ['payload' ])
150
119
except ConnectionError :
151
- self .assertEquals (conn ._dirty , True )
120
+ self .assertEquals (self . conn ._dirty , True )
152
121
153
122
def test_recv (self ):
154
- fake_config = {
155
- 'host' : 'localhost' ,
156
- 'port' : 9090 ,
157
- 'request_id' : 0 ,
158
- 'payload' : 'some test data' ,
159
- }
160
-
161
- # Get a connection
162
- assert socket .create_connection is self .MockCreateConn
163
- conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
164
123
165
124
# A function to mock _read_bytes
166
- conn ._mock_sent_size = False
167
- conn ._mock_data_sent = 0
125
+ self . conn ._mock_sent_size = False
126
+ self . conn ._mock_data_sent = 0
168
127
def mock_socket_recv (num_bytes ):
169
- if not conn ._mock_sent_size :
128
+ if not self . conn ._mock_sent_size :
170
129
assert num_bytes == 4
171
- conn ._mock_sent_size = True
172
- return struct .pack ('>i' , len (fake_config ['payload' ]))
130
+ self . conn ._mock_sent_size = True
131
+ return struct .pack ('>i' , len (self . config ['payload' ]))
173
132
174
- recv_data = struct .pack ('>%ds' % num_bytes , fake_config ['payload' ][conn ._mock_data_sent :conn ._mock_data_sent + num_bytes ])
175
- conn ._mock_data_sent += num_bytes
133
+ recv_data = struct .pack ('>%ds' % num_bytes , self . config ['payload' ][self . conn ._mock_data_sent :self . conn ._mock_data_sent + num_bytes ])
134
+ self . conn ._mock_data_sent += num_bytes
176
135
return recv_data
177
136
178
- with mock .patch .object (conn , '_read_bytes' , new = mock_socket_recv ):
179
- self .assertEquals (conn .recv (fake_config ['request_id' ]), fake_config ['payload' ])
137
+ with mock .patch .object (self . conn , '_read_bytes' , new = mock_socket_recv ):
138
+ self .assertEquals (self . conn .recv (self . config ['request_id' ]), self . config ['payload' ])
180
139
181
140
def test_recv__reconnects_on_dirty_conn (self ):
182
- fake_config = {
183
- 'host' : 'localhost' ,
184
- 'port' : 9090 ,
185
- 'request_id' : 0 ,
186
- 'payload' : 'some test data' ,
187
- }
188
141
189
- # Get a connection
190
- assert socket .create_connection is self .MockCreateConn
191
- conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
192
-
193
- # Dirty it
142
+ # Dirty the connection
194
143
try :
195
- conn ._raise_connection_error ()
144
+ self . conn ._raise_connection_error ()
196
145
except ConnectionError :
197
146
pass
198
-
199
- # Reset the socket call counts
200
- socket .create_connection .reset_mock ()
201
- self .assertEqual (socket .create_connection .call_count , 0 )
147
+ assert self .conn ._dirty is True
202
148
203
149
# Now test that recv'ing attempts to reconnect
204
- conn ._sock .recv .return_value = fake_config ['payload' ]
205
- conn ._read_bytes (len (fake_config ['payload' ]))
150
+ self .assertEqual (socket .create_connection .call_count , 0 )
151
+ self .conn ._sock .recv .return_value = self .config ['payload' ]
152
+ self .conn ._read_bytes (len (self .config ['payload' ]))
206
153
self .assertEqual (socket .create_connection .call_count , 1 )
207
154
208
155
# A second way to dirty it...
209
- conn .close ()
156
+ self . conn .close ()
210
157
211
158
# Reset the socket call counts
212
159
socket .create_connection .reset_mock ()
213
160
self .assertEqual (socket .create_connection .call_count , 0 )
214
161
215
162
# Now test that recv'ing attempts to reconnect
216
- conn ._read_bytes (len (fake_config ['payload' ]))
163
+ self . conn ._read_bytes (len (self . config ['payload' ]))
217
164
self .assertEqual (socket .create_connection .call_count , 1 )
218
165
219
166
@unittest2 .skip ("Not Implemented" )
0 commit comments