polario.main

Main CLI interface

 1"""Main CLI interface
 2"""
 3import argparse
 4import json
 5import sys
 6from enum import Enum
 7from functools import reduce
 8from pathlib import Path
 9from pprint import pprint
10
11import polars as pl
12
13from polario import __version__
14
15
16class Command(Enum):
17    SHOW = "show"
18    SCHEMA = "schema"
19    JSON_HEAD = "json_head"
20    JSONL = "jsonl"
21    CONCAT_CSV = "concat_csv"
22    WRITE_CSV = "write_csv"
23
24
25def main() -> int:
26    """Main CLI interface"""
27    parser = argparse.ArgumentParser(
28        description="polario library commandline tool to inspect Parquet files"
29    )
30    parser.add_argument(
31        "--version",
32        action="version",
33        version="%(prog)s {version}".format(version=__version__),
34    )
35    parser.add_argument(
36        "cmd",
37        choices=[c.value for c in Command],
38        help="command to run",
39    )
40    parser.add_argument(
41        "paths",
42        metavar="PATH",
43        type=Path,
44        nargs="+",
45        help="input paths",
46    )
47    args = parser.parse_args()
48    cmd = Command(args.cmd)
49
50    if cmd == Command.CONCAT_CSV:
51        df = reduce(
52            lambda a, b: pl.concat([a, pl.read_csv(b, infer_schema_length=0)]),
53            args.paths[1:],
54            pl.read_csv(args.paths[0], infer_schema_length=0),
55        )
56        print(df)
57        output_filename = Path(args.paths[0].stem + ".parquet")
58        if output_filename.exists():
59            raise ValueError(f"Output file {output_filename} already exists")
60        print("Writing to", output_filename)
61        df.write_parquet(output_filename)
62        return 0
63    paths: list[Path] = args.paths
64    for path in paths:
65        if path.is_dir():
66            raise ValueError(
67                "Input path must be a file. File an issue if you want dataset support."
68            )
69        df = pl.read_parquet(path, use_pyarrow=True)
70        if cmd == Command.SHOW:
71            print(df)
72        elif cmd == Command.SCHEMA:
73            pprint(df.schema)
74        elif cmd == Command.JSON_HEAD:
75            json.dump(df.head().to_dicts(), sys.stdout, indent=2)
76        elif cmd == Command.JSONL:
77            for row in df.to_dicts():
78                json.dump(row, sys.stdout, separators=(",", ":"))
79                sys.stdout.write("\n")
80        elif cmd == Command.WRITE_CSV:
81            output_path = Path(path.name).with_suffix(".csv")
82            if output_path.exists():
83                print(f"Output file {output_path} already exists")
84                continue
85            df.write_csv(output_path)
86    return 0
87
88
89if __name__ == "__main__":
90    sys.exit(main())
class Command(enum.Enum):
17class Command(Enum):
18    SHOW = "show"
19    SCHEMA = "schema"
20    JSON_HEAD = "json_head"
21    JSONL = "jsonl"
22    CONCAT_CSV = "concat_csv"
23    WRITE_CSV = "write_csv"

An enumeration.

SHOW = <Command.SHOW: 'show'>
SCHEMA = <Command.SCHEMA: 'schema'>
JSON_HEAD = <Command.JSON_HEAD: 'json_head'>
JSONL = <Command.JSONL: 'jsonl'>
CONCAT_CSV = <Command.CONCAT_CSV: 'concat_csv'>
WRITE_CSV = <Command.WRITE_CSV: 'write_csv'>
Inherited Members
enum.Enum
name
value
def main() -> int:
26def main() -> int:
27    """Main CLI interface"""
28    parser = argparse.ArgumentParser(
29        description="polario library commandline tool to inspect Parquet files"
30    )
31    parser.add_argument(
32        "--version",
33        action="version",
34        version="%(prog)s {version}".format(version=__version__),
35    )
36    parser.add_argument(
37        "cmd",
38        choices=[c.value for c in Command],
39        help="command to run",
40    )
41    parser.add_argument(
42        "paths",
43        metavar="PATH",
44        type=Path,
45        nargs="+",
46        help="input paths",
47    )
48    args = parser.parse_args()
49    cmd = Command(args.cmd)
50
51    if cmd == Command.CONCAT_CSV:
52        df = reduce(
53            lambda a, b: pl.concat([a, pl.read_csv(b, infer_schema_length=0)]),
54            args.paths[1:],
55            pl.read_csv(args.paths[0], infer_schema_length=0),
56        )
57        print(df)
58        output_filename = Path(args.paths[0].stem + ".parquet")
59        if output_filename.exists():
60            raise ValueError(f"Output file {output_filename} already exists")
61        print("Writing to", output_filename)
62        df.write_parquet(output_filename)
63        return 0
64    paths: list[Path] = args.paths
65    for path in paths:
66        if path.is_dir():
67            raise ValueError(
68                "Input path must be a file. File an issue if you want dataset support."
69            )
70        df = pl.read_parquet(path, use_pyarrow=True)
71        if cmd == Command.SHOW:
72            print(df)
73        elif cmd == Command.SCHEMA:
74            pprint(df.schema)
75        elif cmd == Command.JSON_HEAD:
76            json.dump(df.head().to_dicts(), sys.stdout, indent=2)
77        elif cmd == Command.JSONL:
78            for row in df.to_dicts():
79                json.dump(row, sys.stdout, separators=(",", ":"))
80                sys.stdout.write("\n")
81        elif cmd == Command.WRITE_CSV:
82            output_path = Path(path.name).with_suffix(".csv")
83            if output_path.exists():
84                print(f"Output file {output_path} already exists")
85                continue
86            df.write_csv(output_path)
87    return 0

Main CLI interface