@@ -128,17 +128,93 @@ def test_send__reconnects_on_dirty_conn(self):
128
128
self .assertEqual (socket .create_connection .call_count , 1 )
129
129
130
130
131
- @unittest2 .skip ("Not Implemented" )
132
131
def test_send__failure_sets_dirty_connection (self ):
133
- pass
132
+ fake_config = {
133
+ 'host' : 'localhost' ,
134
+ 'port' : 9090 ,
135
+ 'request_id' : 0 ,
136
+ 'payload' : 'test data'
137
+ }
138
+
139
+ def raise_error (* args ):
140
+ raise socket .error
141
+
142
+ # Get a connection (with socket mocked)
143
+ assert socket .create_connection is self .MockCreateConn
144
+ conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
145
+
146
+ assert isinstance (conn ._sock , mock .Mock )
147
+ conn ._sock .sendall .side_effect = raise_error
148
+ try :
149
+ conn .send (fake_config ['request_id' ], fake_config ['payload' ])
150
+ except ConnectionError :
151
+ self .assertEquals (conn ._dirty , True )
134
152
135
- @unittest2 .skip ("Not Implemented" )
136
153
def test_recv (self ):
137
- pass
154
+ fake_config = {
155
+ 'host' : 'localhost' ,
156
+ 'port' : 9090 ,
157
+ 'request_id' : 0 ,
158
+ 'payload' : 'some test data' ,
159
+ }
160
+
161
+ # Get a connection
162
+ assert socket .create_connection is self .MockCreateConn
163
+ conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
164
+
165
+ # A function to mock _read_bytes
166
+ conn ._mock_sent_size = False
167
+ conn ._mock_data_sent = 0
168
+ def mock_socket_recv (num_bytes ):
169
+ if not conn ._mock_sent_size :
170
+ assert num_bytes == 4
171
+ conn ._mock_sent_size = True
172
+ return struct .pack ('>i' , len (fake_config ['payload' ]))
173
+
174
+ recv_data = struct .pack ('>%ds' % num_bytes , fake_config ['payload' ][conn ._mock_data_sent :conn ._mock_data_sent + num_bytes ])
175
+ conn ._mock_data_sent += num_bytes
176
+ return recv_data
177
+
178
+ with mock .patch .object (conn , '_read_bytes' , new = mock_socket_recv ):
179
+ self .assertEquals (conn .recv (fake_config ['request_id' ]), fake_config ['payload' ])
138
180
139
- @unittest2 .skip ("Not Implemented" )
140
181
def test_recv__reconnects_on_dirty_conn (self ):
141
- pass
182
+ fake_config = {
183
+ 'host' : 'localhost' ,
184
+ 'port' : 9090 ,
185
+ 'request_id' : 0 ,
186
+ 'payload' : 'some test data' ,
187
+ }
188
+
189
+ # Get a connection
190
+ assert socket .create_connection is self .MockCreateConn
191
+ conn = KafkaConnection (fake_config ['host' ], fake_config ['port' ])
192
+
193
+ # Dirty it
194
+ try :
195
+ conn ._raise_connection_error ()
196
+ except ConnectionError :
197
+ pass
198
+
199
+ # Reset the socket call counts
200
+ socket .create_connection .reset_mock ()
201
+ self .assertEqual (socket .create_connection .call_count , 0 )
202
+
203
+ # Now test that recv'ing attempts to reconnect
204
+ conn ._sock .recv .return_value = fake_config ['payload' ]
205
+ conn ._read_bytes (len (fake_config ['payload' ]))
206
+ self .assertEqual (socket .create_connection .call_count , 1 )
207
+
208
+ # A second way to dirty it...
209
+ conn .close ()
210
+
211
+ # Reset the socket call counts
212
+ socket .create_connection .reset_mock ()
213
+ self .assertEqual (socket .create_connection .call_count , 0 )
214
+
215
+ # Now test that recv'ing attempts to reconnect
216
+ conn ._read_bytes (len (fake_config ['payload' ]))
217
+ self .assertEqual (socket .create_connection .call_count , 1 )
142
218
143
219
@unittest2 .skip ("Not Implemented" )
144
220
def test_recv__failure_sets_dirty_connection (self ):
0 commit comments