1import unittest
2from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
3import binascii
4import random
5from test.test_support import precisionbigmemtest, _1G, _4G
6import sys
7
8try:
9    import mmap
10except ImportError:
11    mmap = None
12
13zlib = import_module('zlib')
14
15
16class ChecksumTestCase(unittest.TestCase):
17    # checksum test cases
18    def test_crc32start(self):
19        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
20        self.assertTrue(zlib.crc32("abc", 0xffffffff))
21
22    def test_crc32empty(self):
23        self.assertEqual(zlib.crc32("", 0), 0)
24        self.assertEqual(zlib.crc32("", 1), 1)
25        self.assertEqual(zlib.crc32("", 432), 432)
26
27    def test_adler32start(self):
28        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
29        self.assertTrue(zlib.adler32("abc", 0xffffffff))
30
31    def test_adler32empty(self):
32        self.assertEqual(zlib.adler32("", 0), 0)
33        self.assertEqual(zlib.adler32("", 1), 1)
34        self.assertEqual(zlib.adler32("", 432), 432)
35
36    def assertEqual32(self, seen, expected):
37        # 32-bit values masked -- checksums on 32- vs 64- bit machines
38        # This is important if bit 31 (0x08000000L) is set.
39        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
40
41    def test_penguins(self):
42        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
43        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
44        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
45        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
46
47        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
48        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
49
50    def test_abcdefghijklmnop(self):
51        """test issue1202 compliance: signed crc32, adler32 in 2.x"""
52        foo = 'abcdefghijklmnop'
53        # explicitly test signed behavior
54        self.assertEqual(zlib.crc32(foo), -1808088941)
55        self.assertEqual(zlib.crc32('spam'), 1138425661)
56        self.assertEqual(zlib.adler32(foo+foo), -721416943)
57        self.assertEqual(zlib.adler32('spam'), 72286642)
58
59    def test_same_as_binascii_crc32(self):
60        foo = 'abcdefghijklmnop'
61        self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
62        self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
63
64    def test_negative_crc_iv_input(self):
65        # The range of valid input values for the crc state should be
66        # -2**31 through 2**32-1 to allow inputs artifically constrained
67        # to a signed 32-bit integer.
68        self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
69        self.assertEqual(zlib.crc32('spam', -3141593),
70                         zlib.crc32('spam',  0xffd01027L))
71        self.assertEqual(zlib.crc32('spam', -(2**31)),
72                         zlib.crc32('spam',  (2**31)))
73
74
75class ExceptionTestCase(unittest.TestCase):
76    # make sure we generate some expected errors
77    def test_badlevel(self):
78        # specifying compression level out of range causes an error
79        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
80        # accepts 0 too)
81        self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
82
83    def test_badcompressobj(self):
84        # verify failure on building compress object with bad params
85        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
86        # specifying total bits too large causes an error
87        self.assertRaises(ValueError,
88                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
89
90    def test_baddecompressobj(self):
91        # verify failure on building decompress object with bad params
92        self.assertRaises(ValueError, zlib.decompressobj, -1)
93
94    def test_decompressobj_badflush(self):
95        # verify failure on calling decompressobj.flush with bad params
96        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
97        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
98
99
100class BaseCompressTestCase(object):
101    def check_big_compress_buffer(self, size, compress_func):
102        _1M = 1024 * 1024
103        fmt = "%%0%dx" % (2 * _1M)
104        # Generate 10MB worth of random, and expand it by repeating it.
105        # The assumption is that zlib's memory is not big enough to exploit
106        # such spread out redundancy.
107        data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
108                        for i in range(10)])
109        data = data * (size // len(data) + 1)
110        try:
111            compress_func(data)
112        finally:
113            # Release memory
114            data = None
115
116    def check_big_decompress_buffer(self, size, decompress_func):
117        data = 'x' * size
118        try:
119            compressed = zlib.compress(data, 1)
120        finally:
121            # Release memory
122            data = None
123        data = decompress_func(compressed)
124        # Sanity check
125        try:
126            self.assertEqual(len(data), size)
127            self.assertEqual(len(data.strip('x')), 0)
128        finally:
129            data = None
130
131
132class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
133    # Test compression in one go (whole message compression)
134    def test_speech(self):
135        x = zlib.compress(HAMLET_SCENE)
136        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
137
138    def test_speech128(self):
139        # compress more data
140        data = HAMLET_SCENE * 128
141        x = zlib.compress(data)
142        self.assertEqual(zlib.decompress(x), data)
143
144    def test_incomplete_stream(self):
145        # An useful error message is given
146        x = zlib.compress(HAMLET_SCENE)
147        self.assertRaisesRegexp(zlib.error,
148            "Error -5 while decompressing data: incomplete or truncated stream",
149            zlib.decompress, x[:-1])
150
151    # Memory use of the following functions takes into account overallocation
152
153    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
154    def test_big_compress_buffer(self, size):
155        compress = lambda s: zlib.compress(s, 1)
156        self.check_big_compress_buffer(size, compress)
157
158    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
159    def test_big_decompress_buffer(self, size):
160        self.check_big_decompress_buffer(size, zlib.decompress)
161
162
163class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
164    # Test compression object
165    def test_pair(self):
166        # straightforward compress/decompress objects
167        data = HAMLET_SCENE * 128
168        co = zlib.compressobj()
169        x1 = co.compress(data)
170        x2 = co.flush()
171        self.assertRaises(zlib.error, co.flush) # second flush should not work
172        dco = zlib.decompressobj()
173        y1 = dco.decompress(x1 + x2)
174        y2 = dco.flush()
175        self.assertEqual(data, y1 + y2)
176
177    def test_compressoptions(self):
178        # specify lots of options to compressobj()
179        level = 2
180        method = zlib.DEFLATED
181        wbits = -12
182        memlevel = 9
183        strategy = zlib.Z_FILTERED
184        co = zlib.compressobj(level, method, wbits, memlevel, strategy)
185        x1 = co.compress(HAMLET_SCENE)
186        x2 = co.flush()
187        dco = zlib.decompressobj(wbits)
188        y1 = dco.decompress(x1 + x2)
189        y2 = dco.flush()
190        self.assertEqual(HAMLET_SCENE, y1 + y2)
191
192    def test_compressincremental(self):
193        # compress object in steps, decompress object as one-shot
194        data = HAMLET_SCENE * 128
195        co = zlib.compressobj()
196        bufs = []
197        for i in range(0, len(data), 256):
198            bufs.append(co.compress(data[i:i+256]))
199        bufs.append(co.flush())
200        combuf = ''.join(bufs)
201
202        dco = zlib.decompressobj()
203        y1 = dco.decompress(''.join(bufs))
204        y2 = dco.flush()
205        self.assertEqual(data, y1 + y2)
206
207    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
208        # compress object in steps, decompress object in steps
209        source = source or HAMLET_SCENE
210        data = source * 128
211        co = zlib.compressobj()
212        bufs = []
213        for i in range(0, len(data), cx):
214            bufs.append(co.compress(data[i:i+cx]))
215        bufs.append(co.flush())
216        combuf = ''.join(bufs)
217
218        self.assertEqual(data, zlib.decompress(combuf))
219
220        dco = zlib.decompressobj()
221        bufs = []
222        for i in range(0, len(combuf), dcx):
223            bufs.append(dco.decompress(combuf[i:i+dcx]))
224            self.assertEqual('', dco.unconsumed_tail, ########
225                             "(A) uct should be '': not %d long" %
226                                       len(dco.unconsumed_tail))
227        if flush:
228            bufs.append(dco.flush())
229        else:
230            while True:
231                chunk = dco.decompress('')
232                if chunk:
233                    bufs.append(chunk)
234                else:
235                    break
236        self.assertEqual('', dco.unconsumed_tail, ########
237                         "(B) uct should be '': not %d long" %
238                                       len(dco.unconsumed_tail))
239        self.assertEqual(data, ''.join(bufs))
240        # Failure means: "decompressobj with init options failed"
241
242    def test_decompincflush(self):
243        self.test_decompinc(flush=True)
244
245    def test_decompimax(self, source=None, cx=256, dcx=64):
246        # compress in steps, decompress in length-restricted steps
247        source = source or HAMLET_SCENE
248        # Check a decompression object with max_length specified
249        data = source * 128
250        co = zlib.compressobj()
251        bufs = []
252        for i in range(0, len(data), cx):
253            bufs.append(co.compress(data[i:i+cx]))
254        bufs.append(co.flush())
255        combuf = ''.join(bufs)
256        self.assertEqual(data, zlib.decompress(combuf),
257                         'compressed data failure')
258
259        dco = zlib.decompressobj()
260        bufs = []
261        cb = combuf
262        while cb:
263            #max_length = 1 + len(cb)//10
264            chunk = dco.decompress(cb, dcx)
265            self.assertFalse(len(chunk) > dcx,
266                    'chunk too big (%d>%d)' % (len(chunk), dcx))
267            bufs.append(chunk)
268            cb = dco.unconsumed_tail
269        bufs.append(dco.flush())
270        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
271
272    def test_decompressmaxlen(self, flush=False):
273        # Check a decompression object with max_length specified
274        data = HAMLET_SCENE * 128
275        co = zlib.compressobj()
276        bufs = []
277        for i in range(0, len(data), 256):
278            bufs.append(co.compress(data[i:i+256]))
279        bufs.append(co.flush())
280        combuf = ''.join(bufs)
281        self.assertEqual(data, zlib.decompress(combuf),
282                         'compressed data failure')
283
284        dco = zlib.decompressobj()
285        bufs = []
286        cb = combuf
287        while cb:
288            max_length = 1 + len(cb)//10
289            chunk = dco.decompress(cb, max_length)
290            self.assertFalse(len(chunk) > max_length,
291                        'chunk too big (%d>%d)' % (len(chunk),max_length))
292            bufs.append(chunk)
293            cb = dco.unconsumed_tail
294        if flush:
295            bufs.append(dco.flush())
296        else:
297            while chunk:
298                chunk = dco.decompress('', max_length)
299                self.assertFalse(len(chunk) > max_length,
300                            'chunk too big (%d>%d)' % (len(chunk),max_length))
301                bufs.append(chunk)
302        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
303
304    def test_decompressmaxlenflush(self):
305        self.test_decompressmaxlen(flush=True)
306
307    def test_maxlenmisc(self):
308        # Misc tests of max_length
309        dco = zlib.decompressobj()
310        self.assertRaises(ValueError, dco.decompress, "", -1)
311        self.assertEqual('', dco.unconsumed_tail)
312
313    def test_clear_unconsumed_tail(self):
314        # Issue #12050: calling decompress() without providing max_length
315        # should clear the unconsumed_tail attribute.
316        cdata = "x\x9cKLJ\x06\x00\x02M\x01"     # "abc"
317        dco = zlib.decompressobj()
318        ddata = dco.decompress(cdata, 1)
319        ddata += dco.decompress(dco.unconsumed_tail)
320        self.assertEqual(dco.unconsumed_tail, "")
321
322    def test_flushes(self):
323        # Test flush() with the various options, using all the
324        # different levels in order to provide more variations.
325        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
326        sync_opt = [getattr(zlib, opt) for opt in sync_opt
327                    if hasattr(zlib, opt)]
328        data = HAMLET_SCENE * 8
329
330        for sync in sync_opt:
331            for level in range(10):
332                obj = zlib.compressobj( level )
333                a = obj.compress( data[:3000] )
334                b = obj.flush( sync )
335                c = obj.compress( data[3000:] )
336                d = obj.flush()
337                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
338                                 data, ("Decompress failed: flush "
339                                        "mode=%i, level=%i") % (sync, level))
340                del obj
341
342    def test_odd_flush(self):
343        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
344        import random
345
346        if hasattr(zlib, 'Z_SYNC_FLUSH'):
347            # Testing on 17K of "random" data
348
349            # Create compressor and decompressor objects
350            co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
351            dco = zlib.decompressobj()
352
353            # Try 17K of data
354            # generate random data stream
355            try:
356                # In 2.3 and later, WichmannHill is the RNG of the bug report
357                gen = random.WichmannHill()
358            except AttributeError:
359                try:
360                    # 2.2 called it Random
361                    gen = random.Random()
362                except AttributeError:
363                    # others might simply have a single RNG
364                    gen = random
365            gen.seed(1)
366            data = genblock(1, 17 * 1024, generator=gen)
367
368            # compress, sync-flush, and decompress
369            first = co.compress(data)
370            second = co.flush(zlib.Z_SYNC_FLUSH)
371            expanded = dco.decompress(first + second)
372
373            # if decompressed data is different from the input data, choke.
374            self.assertEqual(expanded, data, "17K random source doesn't match")
375
376    def test_empty_flush(self):
377        # Test that calling .flush() on unused objects works.
378        # (Bug #1083110 -- calling .flush() on decompress objects
379        # caused a core dump.)
380
381        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
382        self.assertTrue(co.flush())  # Returns a zlib header
383        dco = zlib.decompressobj()
384        self.assertEqual(dco.flush(), "") # Returns nothing
385
386    def test_decompress_incomplete_stream(self):
387        # This is 'foo', deflated
388        x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
389        # For the record
390        self.assertEqual(zlib.decompress(x), 'foo')
391        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
392        # Omitting the stream end works with decompressor objects
393        # (see issue #8672).
394        dco = zlib.decompressobj()
395        y = dco.decompress(x[:-5])
396        y += dco.flush()
397        self.assertEqual(y, 'foo')
398
399    if hasattr(zlib.compressobj(), "copy"):
400        def test_compresscopy(self):
401            # Test copying a compression object
402            data0 = HAMLET_SCENE
403            data1 = HAMLET_SCENE.swapcase()
404            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
405            bufs0 = []
406            bufs0.append(c0.compress(data0))
407
408            c1 = c0.copy()
409            bufs1 = bufs0[:]
410
411            bufs0.append(c0.compress(data0))
412            bufs0.append(c0.flush())
413            s0 = ''.join(bufs0)
414
415            bufs1.append(c1.compress(data1))
416            bufs1.append(c1.flush())
417            s1 = ''.join(bufs1)
418
419            self.assertEqual(zlib.decompress(s0),data0+data0)
420            self.assertEqual(zlib.decompress(s1),data0+data1)
421
422        def test_badcompresscopy(self):
423            # Test copying a compression object in an inconsistent state
424            c = zlib.compressobj()
425            c.compress(HAMLET_SCENE)
426            c.flush()
427            self.assertRaises(ValueError, c.copy)
428
429    if hasattr(zlib.decompressobj(), "copy"):
430        def test_decompresscopy(self):
431            # Test copying a decompression object
432            data = HAMLET_SCENE
433            comp = zlib.compress(data)
434
435            d0 = zlib.decompressobj()
436            bufs0 = []
437            bufs0.append(d0.decompress(comp[:32]))
438
439            d1 = d0.copy()
440            bufs1 = bufs0[:]
441
442            bufs0.append(d0.decompress(comp[32:]))
443            s0 = ''.join(bufs0)
444
445            bufs1.append(d1.decompress(comp[32:]))
446            s1 = ''.join(bufs1)
447
448            self.assertEqual(s0,s1)
449            self.assertEqual(s0,data)
450
451        def test_baddecompresscopy(self):
452            # Test copying a compression object in an inconsistent state
453            data = zlib.compress(HAMLET_SCENE)
454            d = zlib.decompressobj()
455            d.decompress(data)
456            d.flush()
457            self.assertRaises(ValueError, d.copy)
458
459    # Memory use of the following functions takes into account overallocation
460
461    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
462    def test_big_compress_buffer(self, size):
463        c = zlib.compressobj(1)
464        compress = lambda s: c.compress(s) + c.flush()
465        self.check_big_compress_buffer(size, compress)
466
467    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
468    def test_big_decompress_buffer(self, size):
469        d = zlib.decompressobj()
470        decompress = lambda s: d.decompress(s) + d.flush()
471        self.check_big_decompress_buffer(size, decompress)
472
473
474def genblock(seed, length, step=1024, generator=random):
475    """length-byte stream of random data from a seed (in step-byte blocks)."""
476    if seed is not None:
477        generator.seed(seed)
478    randint = generator.randint
479    if length < step or step < 2:
480        step = length
481    blocks = []
482    for i in range(0, length, step):
483        blocks.append(''.join([chr(randint(0,255))
484                               for x in range(step)]))
485    return ''.join(blocks)[:length]
486
487
488
489def choose_lines(source, number, seed=None, generator=random):
490    """Return a list of number lines randomly chosen from the source"""
491    if seed is not None:
492        generator.seed(seed)
493    sources = source.split('\n')
494    return [generator.choice(sources) for n in range(number)]
495
496
497
498HAMLET_SCENE = """
499LAERTES
500
501       O, fear me not.
502       I stay too long: but here my father comes.
503
504       Enter POLONIUS
505
506       A double blessing is a double grace,
507       Occasion smiles upon a second leave.
508
509LORD POLONIUS
510
511       Yet here, Laertes! aboard, aboard, for shame!
512       The wind sits in the shoulder of your sail,
513       And you are stay'd for. There; my blessing with thee!
514       And these few precepts in thy memory
515       See thou character. Give thy thoughts no tongue,
516       Nor any unproportioned thought his act.
517       Be thou familiar, but by no means vulgar.
518       Those friends thou hast, and their adoption tried,
519       Grapple them to thy soul with hoops of steel;
520       But do not dull thy palm with entertainment
521       Of each new-hatch'd, unfledged comrade. Beware
522       Of entrance to a quarrel, but being in,
523       Bear't that the opposed may beware of thee.
524       Give every man thy ear, but few thy voice;
525       Take each man's censure, but reserve thy judgment.
526       Costly thy habit as thy purse can buy,
527       But not express'd in fancy; rich, not gaudy;
528       For the apparel oft proclaims the man,
529       And they in France of the best rank and station
530       Are of a most select and generous chief in that.
531       Neither a borrower nor a lender be;
532       For loan oft loses both itself and friend,
533       And borrowing dulls the edge of husbandry.
534       This above all: to thine ownself be true,
535       And it must follow, as the night the day,
536       Thou canst not then be false to any man.
537       Farewell: my blessing season this in thee!
538
539LAERTES
540
541       Most humbly do I take my leave, my lord.
542
543LORD POLONIUS
544
545       The time invites you; go; your servants tend.
546
547LAERTES
548
549       Farewell, Ophelia; and remember well
550       What I have said to you.
551
552OPHELIA
553
554       'Tis in my memory lock'd,
555       And you yourself shall keep the key of it.
556
557LAERTES
558
559       Farewell.
560"""
561
562
563def test_main():
564    run_unittest(
565        ChecksumTestCase,
566        ExceptionTestCase,
567        CompressTestCase,
568        CompressObjectTestCase
569    )
570
571if __name__ == "__main__":
572    test_main()
573