polario.delta_dataset
1from typing import Literal, Optional 2from urllib.parse import urlsplit 3 4import fsspec 5import polars as pl 6 7 8class DeltaDataset: 9 """Dataset based on deltatable storage""" 10 11 def __init__(self, url: str, partition_columns: list[str] = []): 12 self.url = url.rstrip("/") 13 # Load fsspec filesystem from url scheme 14 location = urlsplit(url) 15 self.fs = fsspec.filesystem(location.scheme) 16 self.partition_columns = partition_columns 17 18 def append(self, df: pl.DataFrame) -> None: 19 self._write_delta(df, mode="append") 20 21 def write(self, df: pl.DataFrame) -> None: 22 self._write_delta(df, mode="overwrite") 23 24 def _write_delta( 25 self, df: pl.DataFrame, mode: Literal["append", "overwrite"] 26 ) -> None: 27 if not set(df.columns).issuperset(set(self.partition_columns)) or len( 28 df.columns 29 ) <= len(self.partition_columns): 30 raise ValueError( 31 f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}" 32 ) 33 df.write_delta( 34 self.url, 35 mode=mode, 36 delta_write_options={"partition_by": self.partition_columns}, 37 ) 38 39 def read_partition(self, partition_column_values: dict[str, str]) -> pl.DataFrame: 40 if set(partition_column_values.keys()) != set(self.partition_columns): 41 raise ValueError( 42 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 43 ) 44 45 return pl.read_delta( 46 self.url, 47 pyarrow_options={ 48 "partitions": [ 49 (pcol, "=", pval) for pcol, pval in partition_column_values.items() 50 ] 51 }, 52 ) 53 54 def scan(self) -> Optional[pl.LazyFrame]: 55 try: 56 return pl.scan_delta(self.url) 57 except Exception as e: 58 if ".TableNotFoundError" in str(type(e)): 59 return None 60 raise e
class
DeltaDataset:
9class DeltaDataset: 10 """Dataset based on deltatable storage""" 11 12 def __init__(self, url: str, partition_columns: list[str] = []): 13 self.url = url.rstrip("/") 14 # Load fsspec filesystem from url scheme 15 location = urlsplit(url) 16 self.fs = fsspec.filesystem(location.scheme) 17 self.partition_columns = partition_columns 18 19 def append(self, df: pl.DataFrame) -> None: 20 self._write_delta(df, mode="append") 21 22 def write(self, df: pl.DataFrame) -> None: 23 self._write_delta(df, mode="overwrite") 24 25 def _write_delta( 26 self, df: pl.DataFrame, mode: Literal["append", "overwrite"] 27 ) -> None: 28 if not set(df.columns).issuperset(set(self.partition_columns)) or len( 29 df.columns 30 ) <= len(self.partition_columns): 31 raise ValueError( 32 f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}" 33 ) 34 df.write_delta( 35 self.url, 36 mode=mode, 37 delta_write_options={"partition_by": self.partition_columns}, 38 ) 39 40 def read_partition(self, partition_column_values: dict[str, str]) -> pl.DataFrame: 41 if set(partition_column_values.keys()) != set(self.partition_columns): 42 raise ValueError( 43 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 44 ) 45 46 return pl.read_delta( 47 self.url, 48 pyarrow_options={ 49 "partitions": [ 50 (pcol, "=", pval) for pcol, pval in partition_column_values.items() 51 ] 52 }, 53 ) 54 55 def scan(self) -> Optional[pl.LazyFrame]: 56 try: 57 return pl.scan_delta(self.url) 58 except Exception as e: 59 if ".TableNotFoundError" in str(type(e)): 60 return None 61 raise e
Dataset based on deltatable storage
def
read_partition( self, partition_column_values: dict[str, str]) -> polars.dataframe.frame.DataFrame:
40 def read_partition(self, partition_column_values: dict[str, str]) -> pl.DataFrame: 41 if set(partition_column_values.keys()) != set(self.partition_columns): 42 raise ValueError( 43 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 44 ) 45 46 return pl.read_delta( 47 self.url, 48 pyarrow_options={ 49 "partitions": [ 50 (pcol, "=", pval) for pcol, pval in partition_column_values.items() 51 ] 52 }, 53 )