5
5
import os
6
6
from shutil import copyfile
7
7
from typing import Optional
8
- import zlib
8
+ import gzip
9
+ import tempfile
9
10
import pytest
10
11
import responses
11
12
import requests
@@ -488,15 +489,37 @@ def test_gzip_compression():
488
489
service .set_enable_gzip_compression (True )
489
490
assert service .get_enable_gzip_compression ()
490
491
prepped = service .prepare_request ('GET' , url = '' , data = json .dumps ({"foo" : "bar" }))
491
- assert prepped ['data' ] == zlib .compress (b'{"foo": "bar"}' )
492
+ assert prepped ['data' ] == gzip .compress (b'{"foo": "bar"}' )
492
493
assert prepped ['headers' ].get ('content-encoding' ) == 'gzip'
493
494
494
495
# Should return compressed data when gzip is on for non-json data
495
496
assert service .get_enable_gzip_compression ()
496
497
prepped = service .prepare_request ('GET' , url = '' , data = b'rawdata' )
497
- assert prepped ['data' ] == zlib .compress (b'rawdata' )
498
+ assert prepped ['data' ] == gzip .compress (b'rawdata' )
498
499
assert prepped ['headers' ].get ('content-encoding' ) == 'gzip'
499
500
501
+ # Should return compressed data when gzip is on for gzip file data
502
+ assert service .get_enable_gzip_compression ()
503
+ with tempfile .TemporaryFile (mode = 'w+b' ) as t_f :
504
+ with gzip .GzipFile (mode = 'wb' , fileobj = t_f ) as gz_f :
505
+ gz_f .write (json .dumps ({"foo" : "bar" }).encode ())
506
+ with gzip .GzipFile (mode = 'rb' , fileobj = t_f ) as gz_f :
507
+ gzip_data = gz_f .read ()
508
+ prepped = service .prepare_request ('GET' , url = '' , data = gzip_data )
509
+ assert prepped ['data' ] == gzip .compress (t_f .read ())
510
+ assert prepped ['headers' ].get ('content-encoding' ) == 'gzip'
511
+
512
+ # Should return compressed json data when gzip is on for gzip file json data
513
+ assert service .get_enable_gzip_compression ()
514
+ with tempfile .TemporaryFile (mode = 'w+b' ) as t_f :
515
+ with gzip .GzipFile (mode = 'wb' , fileobj = t_f ) as gz_f :
516
+ gz_f .write ("rawdata" .encode ())
517
+ with gzip .GzipFile (mode = 'rb' , fileobj = t_f ) as gz_f :
518
+ gzip_data = gz_f .read ()
519
+ prepped = service .prepare_request ('GET' , url = '' , data = gzip_data )
520
+ assert prepped ['data' ] == gzip .compress (t_f .read ())
521
+ assert prepped ['headers' ].get ('content-encoding' ) == 'gzip'
522
+
500
523
# Should return uncompressed data when content-encoding is set
501
524
assert service .get_enable_gzip_compression ()
502
525
prepped = service .prepare_request ('GET' , url = '' , headers = {"content-encoding" : "gzip" },
@@ -513,7 +536,7 @@ def test_gzip_compression_external():
513
536
assert service .service_url == 'https://mockurl'
514
537
assert service .get_enable_gzip_compression () is True
515
538
prepped = service .prepare_request ('GET' , url = '' , data = json .dumps ({"foo" : "bar" }))
516
- assert prepped ['data' ] == zlib .compress (b'{"foo": "bar"}' )
539
+ assert prepped ['data' ] == gzip .compress (b'{"foo": "bar"}' )
517
540
assert prepped ['headers' ].get ('content-encoding' ) == 'gzip'
518
541
519
542
@responses .activate
0 commit comments