polario.delta_dataset

 1from typing import Literal, Optional
 2from urllib.parse import urlsplit
 3
 4import fsspec
 5import polars as pl
 6
 7
 8class DeltaDataset:
 9    """Dataset based on deltatable storage"""
10
11    def __init__(self, url: str, partition_columns: list[str] = []):
12        self.url = url.rstrip("/")
13        # Load fsspec filesystem from url scheme
14        location = urlsplit(url)
15        self.fs = fsspec.filesystem(location.scheme)
16        self.partition_columns = partition_columns
17
18    def append(self, df: pl.DataFrame) -> None:
19        self._write_delta(df, mode="append")
20
21    def write(self, df: pl.DataFrame) -> None:
22        self._write_delta(df, mode="overwrite")
23
24    def _write_delta(
25        self, df: pl.DataFrame, mode: Literal["append", "overwrite"]
26    ) -> None:
27        if not set(df.columns).issuperset(set(self.partition_columns)) or len(
28            df.columns
29        ) <= len(self.partition_columns):
30            raise ValueError(
31                f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}"
32            )
33        df.write_delta(
34            self.url,
35            mode=mode,
36            delta_write_options={"partition_by": self.partition_columns},
37        )
38
39    def read_partition(self, partition_column_values: dict[str, str]) -> pl.DataFrame:
40        if set(partition_column_values.keys()) != set(self.partition_columns):
41            raise ValueError(
42                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
43            )
44
45        return pl.read_delta(
46            self.url,
47            pyarrow_options={
48                "partitions": [
49                    (pcol, "=", pval) for pcol, pval in partition_column_values.items()
50                ]
51            },
52        )
53
54    def scan(self) -> Optional[pl.LazyFrame]:
55        try:
56            return pl.scan_delta(self.url)
57        except Exception as e:
58            if ".TableNotFoundError" in str(type(e)):
59                return None
60            raise e
class DeltaDataset:
 9class DeltaDataset:
10    """Dataset based on deltatable storage"""
11
12    def __init__(self, url: str, partition_columns: list[str] = []):
13        self.url = url.rstrip("/")
14        # Load fsspec filesystem from url scheme
15        location = urlsplit(url)
16        self.fs = fsspec.filesystem(location.scheme)
17        self.partition_columns = partition_columns
18
19    def append(self, df: pl.DataFrame) -> None:
20        self._write_delta(df, mode="append")
21
22    def write(self, df: pl.DataFrame) -> None:
23        self._write_delta(df, mode="overwrite")
24
25    def _write_delta(
26        self, df: pl.DataFrame, mode: Literal["append", "overwrite"]
27    ) -> None:
28        if not set(df.columns).issuperset(set(self.partition_columns)) or len(
29            df.columns
30        ) <= len(self.partition_columns):
31            raise ValueError(
32                f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}"
33            )
34        df.write_delta(
35            self.url,
36            mode=mode,
37            delta_write_options={"partition_by": self.partition_columns},
38        )
39
40    def read_partition(self, partition_column_values: dict[str, str]) -> pl.DataFrame:
41        if set(partition_column_values.keys()) != set(self.partition_columns):
42            raise ValueError(
43                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
44            )
45
46        return pl.read_delta(
47            self.url,
48            pyarrow_options={
49                "partitions": [
50                    (pcol, "=", pval) for pcol, pval in partition_column_values.items()
51                ]
52            },
53        )
54
55    def scan(self) -> Optional[pl.LazyFrame]:
56        try:
57            return pl.scan_delta(self.url)
58        except Exception as e:
59            if ".TableNotFoundError" in str(type(e)):
60                return None
61            raise e

Dataset based on deltatable storage

DeltaDataset(url: str, partition_columns: list[str] = [])
12    def __init__(self, url: str, partition_columns: list[str] = []):
13        self.url = url.rstrip("/")
14        # Load fsspec filesystem from url scheme
15        location = urlsplit(url)
16        self.fs = fsspec.filesystem(location.scheme)
17        self.partition_columns = partition_columns
def append(self, df: polars.dataframe.frame.DataFrame) -> None:
19    def append(self, df: pl.DataFrame) -> None:
20        self._write_delta(df, mode="append")
def write(self, df: polars.dataframe.frame.DataFrame) -> None:
22    def write(self, df: pl.DataFrame) -> None:
23        self._write_delta(df, mode="overwrite")
def read_partition( self, partition_column_values: dict[str, str]) -> polars.dataframe.frame.DataFrame:
40    def read_partition(self, partition_column_values: dict[str, str]) -> pl.DataFrame:
41        if set(partition_column_values.keys()) != set(self.partition_columns):
42            raise ValueError(
43                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
44            )
45
46        return pl.read_delta(
47            self.url,
48            pyarrow_options={
49                "partitions": [
50                    (pcol, "=", pval) for pcol, pval in partition_column_values.items()
51                ]
52            },
53        )
def scan(self) -> Optional[polars.lazyframe.frame.LazyFrame]:
55    def scan(self) -> Optional[pl.LazyFrame]:
56        try:
57            return pl.scan_delta(self.url)
58        except Exception as e:
59            if ".TableNotFoundError" in str(type(e)):
60                return None
61            raise e