compress-file/main.py

181 lines
5.1 KiB
Python

#!/usr/bin/env python3
import heapq
import uuid
import os
import shutil
from flask import Flask, current_app, request, send_file
app = Flask(__name__)
FILES_DIR = "./files/"
def init():
if os.path.exists(FILES_DIR):
shutil.rmtree(FILES_DIR)
os.mkdir(FILES_DIR)
def main():
init()
app.run(host="0.0.0.0", debug=False, port=8888)
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
REST API
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
@app.route("/", methods=["GET"])
def index():
return current_app.send_static_file('index.html')
@app.route("/api/upload_file", methods=["POST"])
def upload_file():
if 'file' not in request.files:
return "File required", 400
file_id = save_file(request.files['file'])
compression_info = compress(file_id)
input_path = get_input_path(file_id)
os.remove(input_path)
response = {
"file_id": file_id,
"original_size": compression_info.input_size,
"compressed_size": compression_info.output_size
}
return response, 200
@app.route("/api/download_file/<file_id>", methods=["GET"])
def download_file(file_id):
return send_file(get_output_path(file_id))
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
FILE I/O
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
class CompressionInfo():
def __init__(self, input_size: int, output_size: int):
self.input_size = input_size
self.output_size = output_size
def save_file(file) -> str:
"""Save FILE and return its corresponding ID"""
file_id = uuid.uuid4()
file.save(get_input_path(file_id))
return file_id
def compress(file_id: str) -> CompressionInfo:
"""Compress file corresponding to FILE_ID and return related information"""
input_path = get_input_path(file_id)
output_path = get_output_path(file_id)
compress_file(input_path, output_path)
input_size = os.path.getsize(input_path)
output_size = os.path.getsize(output_path)
return CompressionInfo(input_size, output_size)
def get_input_path(file_id) -> str:
return f"{FILES_DIR}/{file_id}-input"
def get_output_path(file_id) -> str:
return f"{FILES_DIR}/{file_id}-compressed"
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
HUFFMAN CODING
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
class TreeNode:
def __init__(self, byte, count, left = None, right = None):
self.byte = byte
self.count = count
self.left = left
self.right = right
def __lt__(self, nxt):
return self.count < nxt.count
def __repr__(self):
return f"{self.byte} [{self.left}_{self.right}]"
def get_codes(input_file: str) -> dict[str, str]:
"""
Given INPUT_FILE, read its contents and return a dictionary containing a
unique code for each unique byte. Uses Huffman coding
"""
# Get frequency of every byte in the file
char_count = {}
for i in range(256):
key = i.to_bytes(1, 'big')
char_count[key] = 0
with open(input_file, "rb") as file:
byte = file.read(1)
while byte != b"":
char_count[byte] += 1
byte = file.read(1)
# Create the initial heap
char_queue = []
for byte, count in char_count.items():
heapq.heappush(char_queue, TreeNode(byte, count))
# Create the tree
while len(char_queue) > 1:
left = heapq.heappop(char_queue)
right = heapq.heappop(char_queue)
new_node = TreeNode(None, left.count + right.count, left, right)
heapq.heappush(char_queue, new_node)
codes = {} # {byte: code}
queue = [(char_queue[0], '')] # (TreeNode, code)
while queue:
node, code = queue.pop()
if not node.left and not node.right:
codes[node.byte] = code
continue
if node.left:
queue.append((node.left, code + '0'))
if node.right:
queue.append((node.right, code + '1'))
return codes
def to_bytes(data: str):
"""
Helper function to convert DATA (string of 0's and 1's) to valid bytes
"""
b = bytearray()
for i in range(0, len(data), 8):
bits = data[i:i+8] + "0000000" # So that we have at least 8 bits
b.append(int(bits[0:8], 2))
return bytes(b)
def write_compressed_file(input_file: str, output_file: str, codes: dict[str, str]):
"""
Compress contents of INPUT_FILE to OUTPUT_FILE using CODES
"""
data_str = ""
with open(input_file, "rb") as input_file:
byte = input_file.read(1)
while byte != b"":
data_str += codes[byte]
byte = input_file.read(1)
data_bytes = to_bytes(data_str)
with open(output_file, "wb") as output_file:
output_file.write(data_bytes)
def compress_file(input_file: str, output_file: str):
"""
Compress contents of INPUT_FILE and save to OUTPUT_FILE
"""
codes = get_codes(input_file)
write_compressed_file(input_file, output_file, codes)
if __name__ == "__main__":
main()