#!/usr/bin/env python3 import os import sys import tempfile import contextlib from pathlib import Path from typing import Sequence, Any, List, Tuple, Dict, Literal, get_args from collections import defaultdict import click from atomicwrites import atomic_write # type: ignore[import] from pura.jsonfast import loads, dumps PrinterType = Literal["stderr", "table", "json"] class Printer: def __init__(self, printer_type: PrinterType) -> None: self.printer_type: PrinterType = printer_type self.messages: List[Tuple[str, str]] = [] def print(self, msg: Tuple[str, str]) -> None: if self.printer_type == "stderr": click.echo(f"json-compress: {msg[0]} {msg[1]}", err=True) elif self.printer_type == "json": click.echo(dumps({"file": msg[0], "msg": msg[1]})) elif self.printer_type == "table": self.messages.append(msg) else: raise ValueError(f"Invalid printer type: {self.printer_type}") def flush(self) -> None: if self.printer_type == "table": if not self.messages: return from tabulate import tabulate click.echo( str(tabulate(self.messages, headers=("File", "Status"), tablefmt="rst")) ) @click.command() @click.option( "--check/--no-check", show_default=True, help="if compressing wouldn't change the size, dont write to the file", is_flag=True, default=True, ) @click.option( "-o", "--output-fmt", type=click.Choice(get_args(PrinterType)), help="Output format", default="stderr", ) @click.argument( "PATH", nargs=-1, type=click.Path(allow_dash=True, exists=True, path_type=Path), required=True, ) def main(check: bool, output_fmt: PrinterType, path: Sequence[Path]) -> None: """ pass files to compress, or '-' from STDIN """ data: Any printer = Printer(output_fmt) print_sizes: bool = "-" not in {str(p) for p in path} sizes: Dict[Path, List[int]] = defaultdict(list) for p in path: if str(p) == "-": data = sys.stdin.read() else: data = p.read_text() sizes[p].append(p.stat().st_size) data = loads(data) compressed: str = dumps(data, minified=True) if str(p) == "-": click.echo(compressed) else: if check: with tempfile.NamedTemporaryFile("w") as temp: temp.write(compressed) temp.flush() tmp_size = os.stat(temp.name).st_size if tmp_size == sizes[p][0]: if print_sizes: printer.print((str(p), "size remained the same")) continue with atomic_write(p, overwrite=True) as f: f.write(compressed) sizes[p].append(p.stat().st_size) if print_sizes: before = sizes[p][0] after = sizes[p][1] diff = before - after if diff == 0: printer.print((str(p), "remained the same")) else: perc = round(diff / before * 100, 4) printer.print((str(p), f"reduced by {perc}%")) printer.flush() if __name__ == "__main__": with contextlib.suppress(KeyboardInterrupt): main()