LZW Compression & Decompression
Abstract: Have been coding for years, still never tried to implement these things on my own. So I decided to do it.
#
Not implementing any compression algorithms by hand feels very wrong to me so I'll start by doing that.
LZ78
The LZ78 algorithm goes as follows:- There is a dictionary
Dcontaining found patterns & their encodings. - There is a variable
Pcontaining the "prefix". In the beginning, the prefix is empty. - Repeat the following as long as there are characters in the input stream:
- Let variable
Cpointing to the most current character in the input stream. -
Check if string
P + Cis inD.- If yes, then let
PbeP + C(addingCto the prefix) - If no:
- Output the encoding of
Pto the output stream. - Output
Cas well. - Add the string
P + Cto the dictionary. - Let
Pbe empty.
- Output the encoding of
- If yes, then let
- Let variable
- If
Pstill contains characters, output the corresponding encoding.
LZW
LZW is LZ78 but with a twist:
- The result only contains code; it does not output characters directly. This is possible because:
- The dictionary is initialized with every character in the character set, and:
- When a mismatch happen:
- The algorithm does not output
C, and Pwill be set asCinstead of empty.
- The algorithm does not output
The code
# because indexing a bytestring in python gives you int & it's like wtf
# have to do some special manuvering instead
def _chr(x):
return bytes([x])
def lzw_compress(bstr):
# type TrieNode = (Code, Map Byte TrieNode)
# type Trie = Map Byte TrieNode
d = {x: (x, {}) for x in range(0, 256)}
d_counter = 256
def _gensym():
nonlocal d_counter
res = d_counter
# maximum code 12-bit
# wikipedia says that the original LZW paper encodes 8-bit into
# 12-bit codes.
if res >= 4095:
return None
d_counter += 1
return res
res = []
p = b''
def _chkpc(pc):
nonlocal d
d_pointer = d
res = True
code = -1
for c in pc:
if d_pointer.get(c):
code = d_pointer[c][0]
d_pointer = d_pointer[c][1]
else:
res = False
break
return res, code, d_pointer
for i, c in enumerate(bstr):
cc = _chr(c)
chkres, pcode, d_parent = _chkpc(p + cc)
if chkres:
p = p + cc
else:
res.append(pcode)
sym = _gensym()
if sym is not None:
d_parent[c] = (sym, {})
p = cc
chkres, pcode, d_parent = _chkpc(p)
res.append(pcode)
return res, d
def lzw_decompress(code_list):
d = {x: _chr(x) for x in range(0, 256)}
d_counter = 256
def _gensym():
nonlocal d_counter
res = d_counter
d_counter += 1
return res
cw = code_list[0]
res = [d[cw]]
pw = cw
p = b''
for i, cw in enumerate(code_list[1:]):
if d.get(cw):
res.append(d[cw])
p = d[pw]
c = _chr(d[cw][0])
d[_gensym()] = p+c
else:
p = d[pw]
c = _chr(d[pw][0])
res.append(p+c)
d[_gensym()] = p+c
pw = cw
return b''.join(res)
Some explanations
- This part:
# type TrieNode = (Code, Map Byte TrieNode) # type Trie = Map Byte TrieNode d = {x: (x, {}) for x in range(0, 256)} d_counter = 256 def _gensym(): nonlocal d_counter res = d_counter # maximum code 12-bit # wikipedia says that the original LZW paper encodes 8-bit into # 12-bit codes. if res >= 4095: return None d_counter += 1 return res def _chkpc(pc): nonlocal d d_pointer = d res = True code = -1 for c in pc: if d_pointer.get(c): code = d_pointer[c][0] d_pointer = d_pointer[c][1] else: res = False break return res, code, d_pointer...is a trie used as the dictionary. If you're doing this in other languages (e.g. C), you might want to use other data structures (e.g. skip list) instead. But why4095? (For 12-bit you can have 4096 different code which is 1 more) You'll know soon. -
This part:
sym = _gensym() if sym is not None: d_parent[c] = (sym, {})...means we are not adding any new strings into the dictionary because the capacity (4096 for 12-bit code) is full.
The test
At first I wrote some script to randomly generate the test cases & measures how well the algorithm works...
import random
import lzw_demo
def _gentest():
x = []
lenx = random.randint(4096, 16384)
for i in range(lenx):
x.append(chr(random.randint(0, 255)).encode('latin-1'))
return b''.join(x)
def _test(n):
failed_test = []
for _ in range(n):
x = _gentest()
a, _ = lzw_demo.lzw_compress(x)
xx = lzw_demo.lzw_decompress(a)
if x != xx:
failed_test.append((x, a, xx))
return failed_test
def _agg(n):
ratio_list = []
for _ in range(n):
x = _gentest()
a, _ = lzw_demo.lzw_compress(x)
ratio_list.append((len(a) * 12 / 8) / len(x))
return ratio_list
...but I quickly found out that I was stupid to write the generator function like that. LZW (or pretty much all compression algorithms) assumes that repeated patterns exist in input (so that one can shorten them) & totally random bytes stream almost definitely wouldn't compress well.
Used a lorem ipsum generator instead. On randomly generated texts the algorithm works pretty well, can get roughly 60% reduce in size.
The homemade lorem ipsum generator
I really don't enjoy manually using online tools so I decided to write one myself. Too lazy to find an English dictionary ready for processing so I wrote a function that generates random V/CV syllables instead.
def _not_jpn_syllable_gen():
c = b'0mnpbtdkgszhrjwh'
v = b'ieaou'
return bytes([c[random.randrange(0, len(c))], v[random.randrange(0, len(v))]]).replace(b'0', b'')
...and build the rest of the generator upon this function. The magic numbers are basically me roughly estimating common Japanese texts.
def _not_jpn_word_gen():
syll_cnt = random.randrange(1, 5)
# more spaces = more chance of not generating an ending consonant
v = b'mnpbtdkgszhrjwh '
res = []
for _ in range(syll_cnt):
res.append(_not_jpn_syllable_gen())
return b''.join(res) + bytes([v[random.randrange(0, len(v))]]).replace(b' ', b'')
def _not_jpn_sentence_gen():
sentence = []
for _ in range(random.randint(1, 4)):
subsentence = []
for _ in range(random.randint(1, 15)):
subsentence.append(_not_jpn_word_gen())
sentence.append(b' '.join(subsentence))
return b', '.join(sentence)
def _not_jpn_paragraph_gen():
res = []
for _ in range(random.randint(1, 10)):
res.append(_not_jpn_sentence_gen())
return b'. '.join(res) + b'.'
Test it out a bit...
>>> _not_jpn_paragraph_gen()
b'gohesu jusepa hukasese ni kas to wop beb rapiduwo sosome miru more, buhijiho wizano domagoja dea soudume
rudoguwo me noezome pomajaza ei, kewisij, zod zirijuzi mehunaho besokis guoju ruas keposej pohege. a zomusaj
zu pigegi zuromi jinuehi mitiwo hibarana pu, bubuse tirebep siji jen woki jajapehu sodisudi hemedoji jiwezi
hipezi ta, zozo be, nadajehes gatujose bepu pihepu na. kuwuzori meose. bugewaw jo zipozu mu wosuhe bagig
hugige wuje wodizeuh worago runure hor wisop su. hozotesa dukujo peinu gojeza dipe medi zadodihi botipiwo.'
>>> _not_jpn_paragraph_gen()
b'suribi wijumo hepededu hoso zomo zasitakak utihuza moju sujibap bujonihi sadi nibob pigopumin wunu ruobena.'
>>> _not_jpn_paragraph_gen()
b'ri tomowoi a tututimi jenoo penu gib jamuhi nawesiza niwima gabeno, ti tede rahine dahunag bonao aze hupota
niheku tek, saoto kiho ha gepazima, zise zegawona hohisuo.'
>>> _not_jpn_paragraph_gen()
b'tuka benegos pa go, gajesok huhejis zukiheka sego. ke. redak tujuwe, jo jura du pejo bipadato, hoh ho
zosuge genose tuhu direba hibuki rubedi nununabo sazira zukabu ga pemuka. he geho. kodi wakonomu hegi uporu
nimazi wite, wemojedi wonanoma odobe goh za worupas gusuhopi hosozute ro tahamu kehesu, howiriju ejejitop
jipijeteg zuho aaguru jikotes mawuwu gosupa zawu bugada risiha dubiru giipa gi nat, bupu hez ja mogaka bahone
hihenehe da gutazomi. tudehum. hatoa, naga ju segi bebuheri runibuh zuboma mosin joh wejaza tanosow rohenoze
benewa suriwik zujamu semu, witogohu doso neha gepajotuh jizesib degeku huje titazopez. gojusado pi
hojuhowu uraba wog mipu nonomonir, horoh winamabo sohu posihaj rigagu du gaiha meejanu azikodi marih hetita,
jogonigi gakowehoj ga asozi dozije gupepa rodoposu rinajad zajemu kude wasuj. paw momiwiji wana pi winiwue
ta hegekuog wugibure ros pi juga.'
oh hell that definitely does not look like Japanese at all, but for what I need it to be it's good enough.
The test, pt. 2
Now that we have our new generator function, let's plug it in:
def _gentest():
x = []
lenx = random.randint(10, 30)
for i in range(lenx):
x.append(_not_jpn_paragraph_gen())
return b'\n\n'.join(x)
>>> sum(_agg(1024)) / 1024
0.4271855249051684
Roughly 58% reduce in size, Slightly less than using lipsum.com. It's probably due to how the texts are generated: normal texts will have more repeated patterns.
Wrapping it up (into a CLI tool)
Remember the4095 above? This is where it's useful:
- Each code takes 12-bit to store which is one and a half byte.
- Half of a byte is (somewhat) tricky to handle; it's easier to pack 2 code together into 3 bytes.
- Imagine a situation where we have exactly one code left for packing; we need something to fill in the blank.
- If we have allowed
4095as a valid code value, there will be situations where all the valid code values are used and we have exactly one code left. In such situations we won't be able to find anything to fill within the 12-bit range because they'll be mistaken as valid encoding of data that does not exist; by disallowing4095we can be sure that it's a padding code.
byte 7 6 5 4 3 2 1 0 7 6 5 4 3 2 1 0 7 6 5 4 3 2 1 0
code 11 10 09 08 07 06 05 04 03 02 01 00 11 10 09 08 07 06 05 04 03 02 01 00
part A A A A A A A A A A A B B B B B B B B B B B B B
The code for packing & de-packing:
def _pack(code_list):
if len(code_list) % 2:
code_list.append(4095)
i = 0
lenx = len(code_list)
res = []
while i < lenx:
code1 = code_list[i]
code2 = code_list[i+1]
byte1 = (code1 >> 4)
byte2 = ((code1 & 0xf) << 4) | (code2 >> 8)
byte3 = (code2 & 0xff)
res.append(byte1)
res.append(byte2)
res.append(byte3)
i += 2
return bytes(res)
def _depack(bstr):
res = []
i = 0
lenx = len(bstr)
while i < lenx:
piece = bstr[i:i+3]
code1 = (piece[0] << 4) | (piece[1] >> 4)
code2 = ((piece[1] & 0xf) << 8) | piece[2]
res.append(code1)
res.append(code2)
i += 3
return res
The rest is simple:
import sys
if len(sys.argv) < 3:
print('Usage: lzw_demo [compress|decompress] [filename]')
elif sys.argv[1] == 'compress':
with open(sys.argv[2], 'rb') as f:
data = f.read()
res, _ = lzw_compress(data)
with open(f'{sys.argv[2]}.lzw', 'wb') as f:
f.write(_pack(res))
elif sys.argv[1] == 'decompress':
with open(sys.argv[2], 'rb') as f:
data = f.read()
unpacked = _unpack(data)
if (unpacked[len(unpacked) - 1] == 4095):
unpacked.pop()
res = lzw_decompress(unpacked)
with open(f'{sys.argv[2]}.lzw_dec', 'wb') as f:
f.write(res)
Back