polario.hive_dataset

The hive dataset implementation

  1"""The hive dataset implementation"""
  2from collections import OrderedDict
  3from functools import reduce
  4from itertools import chain
  5from typing import Iterable, Optional, Tuple, Type
  6from urllib.parse import urlsplit
  7from uuid import uuid4
  8
  9import fsspec
 10import polars as pl
 11from fsspec.spec import AbstractFileSystem
 12
 13DEFAULT_ROWS_PER_FRAGMENT = int(1e6)
 14
 15DEFAULT_PARQUET_WRITE_OPTIONS = {
 16    "use_pyarrow": True,
 17    "compression": "snappy",
 18}
 19
 20
 21def to_relative_location_from(
 22    possible_prefix: str, base_location: str, location: str
 23) -> str:
 24    """Take a location and make it relative to the base location, stripping possible prefix from both"""
 25    relative_location = location
 26    if location.startswith(possible_prefix):
 27        relative_location = relative_location[len(possible_prefix) :]
 28
 29    # If base location is not absolute, it might be somewhere in location
 30    if not base_location.startswith("/"):
 31        if base_location in relative_location:
 32            relative_location = relative_location[
 33                relative_location.find(base_location) :
 34            ]
 35
 36    relative_location = relative_location.lstrip("/")
 37    scheme_less_url = base_location[len(possible_prefix) :].lstrip("/")
 38    if relative_location.startswith(scheme_less_url):
 39        relative_location = relative_location[len(scheme_less_url) + 1 :]
 40    return relative_location
 41
 42
 43class ParquetFragment:
 44    """Pointer to a parquet fragment"""
 45
 46    def __init__(self, url: str, parquet_write_options: dict):
 47        self.url = url
 48        self.parquet_write_options = parquet_write_options
 49
 50    @classmethod
 51    def first_fragment(
 52        cls: Type["ParquetFragment"],
 53        partition_base_url: str,
 54        parquet_write_options: dict,
 55    ) -> "ParquetFragment":
 56        """Return a fragment name for what should be the first fragment in the partition"""
 57        idx = 0
 58        return cls(
 59            f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet",
 60            parquet_write_options,
 61        )
 62
 63    def next_fragment(self) -> "ParquetFragment":
 64        """Return a fragment name for what should be the next fragment in the partition"""
 65        idx = int(self.url.split("/")[-1].split("_")[0]) + 1
 66        return ParquetFragment(
 67            f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet",
 68            self.parquet_write_options,
 69        )
 70
 71    def read(self) -> pl.DataFrame:
 72        """Read the fragment"""
 73        return pl.read_parquet(self.url)
 74
 75    def scan(self) -> pl.LazyFrame:
 76        """Read the fragment"""
 77        return pl.scan_parquet(self.url)
 78
 79    def write(self, df: pl.DataFrame) -> None:
 80        """Write the fragment"""
 81        df.write_parquet(self.url, **self.parquet_write_options)
 82
 83
 84class HivePartition:
 85    """Pointer to a partition in a HiveDataset"""
 86
 87    def __init__(
 88        self,
 89        fs: AbstractFileSystem,
 90        dataset_url: str,
 91        partition_column_values: OrderedDict[str, str],
 92        maximum_rows_per_fragment: int,
 93        parquet_write_options: dict,
 94    ) -> None:
 95        self.fs = fs
 96        self.partition_column_values = partition_column_values
 97        self.maximum_rows_per_fragment = maximum_rows_per_fragment
 98        self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/")
 99        location = urlsplit(dataset_url)
100        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
101        self._parquet_write_options = parquet_write_options
102
103    @classmethod
104    def from_relative_path(
105        cls: Type["HivePartition"],
106        fs: AbstractFileSystem,
107        dataset_url: str,
108        relative_path: str,
109        maximum_rows_per_fragment: int,
110        parquet_write_options: dict,
111    ) -> "HivePartition":
112        """Create a partition from a relative path"""
113        relative_path_elements = relative_path.split("/")
114        if any(map(lambda x: "=" not in x, relative_path_elements)):
115            raise ValueError(
116                f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'"
117            )
118
119        return cls(
120            fs=fs,
121            dataset_url=dataset_url,
122            partition_column_values=OrderedDict(
123                map(lambda x: x.split("=", 1), relative_path_elements)
124            ),
125            maximum_rows_per_fragment=maximum_rows_per_fragment,
126            parquet_write_options=parquet_write_options,
127        )
128
129    def to_relative_path(self) -> str:
130        """Create a relative path from partition column values"""
131        return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])
132
133    def fragment_urls(self) -> Iterable[str]:
134        try:
135            return map(
136                lambda p: self.url
137                + "/"
138                + to_relative_location_from(self.scheme_prefix, self.url, p),
139                self.fs.expand_path("/".join([self.url, "*.parquet"])),
140            )
141        except FileNotFoundError:
142            return []
143
144    def fragments(self) -> Iterable[ParquetFragment]:
145        """Discover fragments"""
146        return map(
147            lambda fragment_url: ParquetFragment(
148                fragment_url,
149                self._parquet_write_options,
150            ),
151            self.fragment_urls(),
152        )
153
154    def read(self) -> Optional[pl.DataFrame]:
155        """Concat the fragments in this partition into a single dataframe"""
156        fragments = [f.read() for f in self.fragments()]
157        if len(fragments) > 1:
158            # Merge schemas from different fragments into a superset schema
159            superset_schema: dict[str, pl.PolarsDataType] = reduce(
160                lambda a, b: a | dict(b.schema),
161                fragments[1:],
162                dict(fragments[0].schema),
163            )
164
165            def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame:
166                missing_columns = superset_schema.keys() - set(df.columns)
167                if missing_columns:
168                    return df.with_columns(
169                        [
170                            pl.lit(None, superset_schema[col]).alias(col)
171                            for col in missing_columns
172                        ]
173                    )
174                else:
175                    return df
176
177            complete_fragements = [
178                add_missing_columns(f).select(superset_schema.keys()) for f in fragments
179            ]
180        else:
181            complete_fragements = fragments
182
183        if complete_fragements:
184            return pl.concat(complete_fragements).with_columns(
185                map(
186                    lambda part: pl.lit(part[1]).alias(part[0]),
187                    self.partition_column_values.items(),
188                )
189            )
190        return None
191
192    def scan(self) -> Optional[pl.LazyFrame]:
193        """Concat the fragments in this partition into a single dataframe"""
194        fragments = [f.scan() for f in self.fragments()]
195        if fragments:
196            return pl.concat(fragments).with_columns(
197                map(
198                    lambda part: pl.lit(part[1]).alias(part[0]),
199                    self.partition_column_values.items(),
200                )
201            )
202        return None
203
204    def _write_to_fragments(
205        self, df: pl.DataFrame, target_fragment: ParquetFragment
206    ) -> None:
207        output_columns = list(
208            sorted(set(df.columns) - self.partition_column_values.keys())
209        )
210        for fragment_df in df.select(output_columns).iter_slices(
211            self.maximum_rows_per_fragment
212        ):
213            target_fragment.write(fragment_df)
214            target_fragment = target_fragment.next_fragment()
215
216    def delete(self) -> None:
217        """Delete the partition"""
218        if self.fs.exists(self.url):
219            self.fs.delete(self.url, recursive=True)
220
221    def write(self, df: pl.DataFrame) -> None:
222        """Write the dataframe to this partition"""
223        self.delete()
224        self.fs.mkdir(self.url)
225        target_fragment = ParquetFragment.first_fragment(
226            self.url, self._parquet_write_options
227        )
228        self._write_to_fragments(df, target_fragment)
229
230    def append(self, df: pl.DataFrame) -> None:
231        """Write the dataframe to this partition"""
232        if not self.fs.exists(self.url):
233            self.fs.mkdir(self.url)
234
235        new_fragment = ParquetFragment.first_fragment(
236            self.url, self._parquet_write_options
237        )
238        try:
239            *_, last_fragment = self.fragments()
240            new_fragment = last_fragment.next_fragment()
241        except ValueError:
242            pass
243        self._write_to_fragments(df, new_fragment)
244
245
246class HiveDataset:
247    """Handle to multiple partitions"""
248
249    def __init__(
250        self,
251        url: str,
252        partition_columns: list[str] = [],
253        max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT,
254        parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS,
255    ) -> None:
256        self.url = url.rstrip("/")
257        # Load fsspec filesystem from url scheme
258        location = urlsplit(url)
259        self.fs = fsspec.filesystem(location.scheme)
260        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
261        self.partition_columns = partition_columns
262        self._max_rows_per_fragment = max_rows_per_fragment
263        self._parquet_write_options = parquet_write_options
264
265    def partitions(self) -> Iterable[HivePartition]:
266        """Iterate over HivePartitions"""
267        if self.partition_columns:
268            glob_pattern = HivePartition(
269                fs=self.fs,
270                dataset_url=self.url,
271                partition_column_values=OrderedDict(
272                    {k: "*" for k in self.partition_columns}
273                ),
274                maximum_rows_per_fragment=self._max_rows_per_fragment,
275                parquet_write_options=self._parquet_write_options,
276            ).to_relative_path()
277            try:
278                partitions = self.fs.expand_path(self.url + "/" + glob_pattern)
279
280                return map(
281                    lambda path: HivePartition.from_relative_path(
282                        fs=self.fs,
283                        dataset_url=self.url,
284                        relative_path=to_relative_location_from(
285                            self.scheme_prefix, self.url, path
286                        ),
287                        maximum_rows_per_fragment=self._max_rows_per_fragment,
288                        parquet_write_options=self._parquet_write_options,
289                    ),
290                    sorted(partitions),
291                )
292            except FileNotFoundError:
293                return []
294        else:
295            return [
296                HivePartition(
297                    fs=self.fs,
298                    dataset_url=self.url,
299                    partition_column_values=OrderedDict(),
300                    maximum_rows_per_fragment=self._max_rows_per_fragment,
301                    parquet_write_options=self._parquet_write_options,
302                )
303            ]
304
305    def read_partitions(self) -> Iterable[pl.DataFrame]:
306        """Iterate over partitions"""
307        for partition in self.partitions():
308            df = partition.read()
309            if df is not None:
310                yield df
311
312    def read_partition(
313        self, partition_column_values: dict[str, str]
314    ) -> Optional[pl.DataFrame]:
315        """Read the given partition from the dataset"""
316        if set(partition_column_values.keys()) != set(self.partition_columns):
317            raise ValueError(
318                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
319            )
320        return HivePartition(
321            fs=self.fs,
322            dataset_url=self.url,
323            partition_column_values=OrderedDict(partition_column_values),
324            maximum_rows_per_fragment=self._max_rows_per_fragment,
325            parquet_write_options=self._parquet_write_options,
326        ).read()
327
328    def delete_partition(self, partition_column_values: dict[str, str]) -> None:
329        """Read the given partition from the dataset"""
330        if set(partition_column_values.keys()) != set(self.partition_columns):
331            raise ValueError(
332                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
333            )
334        return HivePartition(
335            fs=self.fs,
336            dataset_url=self.url,
337            partition_column_values=OrderedDict(partition_column_values),
338            maximum_rows_per_fragment=self._max_rows_per_fragment,
339            parquet_write_options=self._parquet_write_options,
340        ).delete()
341
342    def scan_partitions(self) -> Iterable[pl.LazyFrame]:
343        """Iterate over partitions"""
344        for partition in self.partitions():
345            df = partition.scan()
346            if df is not None:
347                yield df
348
349    def scan(self) -> Optional[pl.LazyFrame]:
350        iterable = iter(self.scan_partitions())
351        first_partition = next(iterable, None)
352        if first_partition is not None:
353            return pl.concat(chain([first_partition], iterable))
354        return None
355
356    def _check_partition_columns(self, df: pl.DataFrame) -> None:
357        """Check if the given dataframe fits in this dataset"""
358        if not set(df.columns).issuperset(set(self.partition_columns)) or len(
359            df.columns
360        ) <= len(self.partition_columns):
361            raise ValueError(
362                f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}"
363            )
364        for pcol in self.partition_columns:
365            if df[pcol].dtype != pl.Utf8:
366                raise ValueError(
367                    f"Partition column {pcol} is not a string, but {df[pcol].dtype}"
368                )
369
370    def _partition_split(
371        self, df: pl.DataFrame
372    ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]:
373        """Split dataframe into partitions and partition dataframes"""
374        self._check_partition_columns(df)
375        if self.partition_columns == []:
376            yield df, HivePartition(
377                fs=self.fs,
378                dataset_url=self.url,
379                partition_column_values=OrderedDict(),
380                maximum_rows_per_fragment=self._max_rows_per_fragment,
381                parquet_write_options=self._parquet_write_options,
382            )
383        else:
384            partition_values = df.select(self.partition_columns).unique().to_dicts()
385            for partition_value in partition_values:
386                yield df.filter(
387                    reduce(
388                        lambda a, b: a & b,
389                        [pl.col(k) == v for k, v in partition_value.items()],
390                    )
391                ), HivePartition(
392                    fs=self.fs,
393                    dataset_url=self.url,
394                    partition_column_values=OrderedDict(
395                        [
396                            (pcol, partition_value[pcol])
397                            for pcol in self.partition_columns
398                        ]
399                    ),
400                    maximum_rows_per_fragment=self._max_rows_per_fragment,
401                    parquet_write_options=self._parquet_write_options,
402                )
403
404    def write(self, df: pl.DataFrame) -> None:
405        self._check_partition_columns(df)
406        for partition_df, partition in self._partition_split(df):
407            partition.write(partition_df)
408
409    def append(self, df: pl.DataFrame) -> None:
410        self._check_partition_columns(df)
411        for partition_df, partition in self._partition_split(df):
412            partition.append(partition_df)
def to_relative_location_from(possible_prefix: str, base_location: str, location: str) -> str:
22def to_relative_location_from(
23    possible_prefix: str, base_location: str, location: str
24) -> str:
25    """Take a location and make it relative to the base location, stripping possible prefix from both"""
26    relative_location = location
27    if location.startswith(possible_prefix):
28        relative_location = relative_location[len(possible_prefix) :]
29
30    # If base location is not absolute, it might be somewhere in location
31    if not base_location.startswith("/"):
32        if base_location in relative_location:
33            relative_location = relative_location[
34                relative_location.find(base_location) :
35            ]
36
37    relative_location = relative_location.lstrip("/")
38    scheme_less_url = base_location[len(possible_prefix) :].lstrip("/")
39    if relative_location.startswith(scheme_less_url):
40        relative_location = relative_location[len(scheme_less_url) + 1 :]
41    return relative_location

Take a location and make it relative to the base location, stripping possible prefix from both

class ParquetFragment:
44class ParquetFragment:
45    """Pointer to a parquet fragment"""
46
47    def __init__(self, url: str, parquet_write_options: dict):
48        self.url = url
49        self.parquet_write_options = parquet_write_options
50
51    @classmethod
52    def first_fragment(
53        cls: Type["ParquetFragment"],
54        partition_base_url: str,
55        parquet_write_options: dict,
56    ) -> "ParquetFragment":
57        """Return a fragment name for what should be the first fragment in the partition"""
58        idx = 0
59        return cls(
60            f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet",
61            parquet_write_options,
62        )
63
64    def next_fragment(self) -> "ParquetFragment":
65        """Return a fragment name for what should be the next fragment in the partition"""
66        idx = int(self.url.split("/")[-1].split("_")[0]) + 1
67        return ParquetFragment(
68            f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet",
69            self.parquet_write_options,
70        )
71
72    def read(self) -> pl.DataFrame:
73        """Read the fragment"""
74        return pl.read_parquet(self.url)
75
76    def scan(self) -> pl.LazyFrame:
77        """Read the fragment"""
78        return pl.scan_parquet(self.url)
79
80    def write(self, df: pl.DataFrame) -> None:
81        """Write the fragment"""
82        df.write_parquet(self.url, **self.parquet_write_options)

Pointer to a parquet fragment

ParquetFragment(url: str, parquet_write_options: dict)
47    def __init__(self, url: str, parquet_write_options: dict):
48        self.url = url
49        self.parquet_write_options = parquet_write_options
@classmethod
def first_fragment( cls: Type[polario.hive_dataset.ParquetFragment], partition_base_url: str, parquet_write_options: dict) -> polario.hive_dataset.ParquetFragment:
51    @classmethod
52    def first_fragment(
53        cls: Type["ParquetFragment"],
54        partition_base_url: str,
55        parquet_write_options: dict,
56    ) -> "ParquetFragment":
57        """Return a fragment name for what should be the first fragment in the partition"""
58        idx = 0
59        return cls(
60            f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet",
61            parquet_write_options,
62        )

Return a fragment name for what should be the first fragment in the partition

def next_fragment(self) -> polario.hive_dataset.ParquetFragment:
64    def next_fragment(self) -> "ParquetFragment":
65        """Return a fragment name for what should be the next fragment in the partition"""
66        idx = int(self.url.split("/")[-1].split("_")[0]) + 1
67        return ParquetFragment(
68            f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet",
69            self.parquet_write_options,
70        )

Return a fragment name for what should be the next fragment in the partition

def read(self) -> polars.dataframe.frame.DataFrame:
72    def read(self) -> pl.DataFrame:
73        """Read the fragment"""
74        return pl.read_parquet(self.url)

Read the fragment

def scan(self) -> polars.lazyframe.frame.LazyFrame:
76    def scan(self) -> pl.LazyFrame:
77        """Read the fragment"""
78        return pl.scan_parquet(self.url)

Read the fragment

def write(self, df: polars.dataframe.frame.DataFrame) -> None:
80    def write(self, df: pl.DataFrame) -> None:
81        """Write the fragment"""
82        df.write_parquet(self.url, **self.parquet_write_options)

Write the fragment

class HivePartition:
 85class HivePartition:
 86    """Pointer to a partition in a HiveDataset"""
 87
 88    def __init__(
 89        self,
 90        fs: AbstractFileSystem,
 91        dataset_url: str,
 92        partition_column_values: OrderedDict[str, str],
 93        maximum_rows_per_fragment: int,
 94        parquet_write_options: dict,
 95    ) -> None:
 96        self.fs = fs
 97        self.partition_column_values = partition_column_values
 98        self.maximum_rows_per_fragment = maximum_rows_per_fragment
 99        self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/")
100        location = urlsplit(dataset_url)
101        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
102        self._parquet_write_options = parquet_write_options
103
104    @classmethod
105    def from_relative_path(
106        cls: Type["HivePartition"],
107        fs: AbstractFileSystem,
108        dataset_url: str,
109        relative_path: str,
110        maximum_rows_per_fragment: int,
111        parquet_write_options: dict,
112    ) -> "HivePartition":
113        """Create a partition from a relative path"""
114        relative_path_elements = relative_path.split("/")
115        if any(map(lambda x: "=" not in x, relative_path_elements)):
116            raise ValueError(
117                f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'"
118            )
119
120        return cls(
121            fs=fs,
122            dataset_url=dataset_url,
123            partition_column_values=OrderedDict(
124                map(lambda x: x.split("=", 1), relative_path_elements)
125            ),
126            maximum_rows_per_fragment=maximum_rows_per_fragment,
127            parquet_write_options=parquet_write_options,
128        )
129
130    def to_relative_path(self) -> str:
131        """Create a relative path from partition column values"""
132        return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])
133
134    def fragment_urls(self) -> Iterable[str]:
135        try:
136            return map(
137                lambda p: self.url
138                + "/"
139                + to_relative_location_from(self.scheme_prefix, self.url, p),
140                self.fs.expand_path("/".join([self.url, "*.parquet"])),
141            )
142        except FileNotFoundError:
143            return []
144
145    def fragments(self) -> Iterable[ParquetFragment]:
146        """Discover fragments"""
147        return map(
148            lambda fragment_url: ParquetFragment(
149                fragment_url,
150                self._parquet_write_options,
151            ),
152            self.fragment_urls(),
153        )
154
155    def read(self) -> Optional[pl.DataFrame]:
156        """Concat the fragments in this partition into a single dataframe"""
157        fragments = [f.read() for f in self.fragments()]
158        if len(fragments) > 1:
159            # Merge schemas from different fragments into a superset schema
160            superset_schema: dict[str, pl.PolarsDataType] = reduce(
161                lambda a, b: a | dict(b.schema),
162                fragments[1:],
163                dict(fragments[0].schema),
164            )
165
166            def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame:
167                missing_columns = superset_schema.keys() - set(df.columns)
168                if missing_columns:
169                    return df.with_columns(
170                        [
171                            pl.lit(None, superset_schema[col]).alias(col)
172                            for col in missing_columns
173                        ]
174                    )
175                else:
176                    return df
177
178            complete_fragements = [
179                add_missing_columns(f).select(superset_schema.keys()) for f in fragments
180            ]
181        else:
182            complete_fragements = fragments
183
184        if complete_fragements:
185            return pl.concat(complete_fragements).with_columns(
186                map(
187                    lambda part: pl.lit(part[1]).alias(part[0]),
188                    self.partition_column_values.items(),
189                )
190            )
191        return None
192
193    def scan(self) -> Optional[pl.LazyFrame]:
194        """Concat the fragments in this partition into a single dataframe"""
195        fragments = [f.scan() for f in self.fragments()]
196        if fragments:
197            return pl.concat(fragments).with_columns(
198                map(
199                    lambda part: pl.lit(part[1]).alias(part[0]),
200                    self.partition_column_values.items(),
201                )
202            )
203        return None
204
205    def _write_to_fragments(
206        self, df: pl.DataFrame, target_fragment: ParquetFragment
207    ) -> None:
208        output_columns = list(
209            sorted(set(df.columns) - self.partition_column_values.keys())
210        )
211        for fragment_df in df.select(output_columns).iter_slices(
212            self.maximum_rows_per_fragment
213        ):
214            target_fragment.write(fragment_df)
215            target_fragment = target_fragment.next_fragment()
216
217    def delete(self) -> None:
218        """Delete the partition"""
219        if self.fs.exists(self.url):
220            self.fs.delete(self.url, recursive=True)
221
222    def write(self, df: pl.DataFrame) -> None:
223        """Write the dataframe to this partition"""
224        self.delete()
225        self.fs.mkdir(self.url)
226        target_fragment = ParquetFragment.first_fragment(
227            self.url, self._parquet_write_options
228        )
229        self._write_to_fragments(df, target_fragment)
230
231    def append(self, df: pl.DataFrame) -> None:
232        """Write the dataframe to this partition"""
233        if not self.fs.exists(self.url):
234            self.fs.mkdir(self.url)
235
236        new_fragment = ParquetFragment.first_fragment(
237            self.url, self._parquet_write_options
238        )
239        try:
240            *_, last_fragment = self.fragments()
241            new_fragment = last_fragment.next_fragment()
242        except ValueError:
243            pass
244        self._write_to_fragments(df, new_fragment)

Pointer to a partition in a HiveDataset

HivePartition( fs: fsspec.spec.AbstractFileSystem, dataset_url: str, partition_column_values: collections.OrderedDict[str, str], maximum_rows_per_fragment: int, parquet_write_options: dict)
 88    def __init__(
 89        self,
 90        fs: AbstractFileSystem,
 91        dataset_url: str,
 92        partition_column_values: OrderedDict[str, str],
 93        maximum_rows_per_fragment: int,
 94        parquet_write_options: dict,
 95    ) -> None:
 96        self.fs = fs
 97        self.partition_column_values = partition_column_values
 98        self.maximum_rows_per_fragment = maximum_rows_per_fragment
 99        self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/")
100        location = urlsplit(dataset_url)
101        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
102        self._parquet_write_options = parquet_write_options
@classmethod
def from_relative_path( cls: Type[polario.hive_dataset.HivePartition], fs: fsspec.spec.AbstractFileSystem, dataset_url: str, relative_path: str, maximum_rows_per_fragment: int, parquet_write_options: dict) -> polario.hive_dataset.HivePartition:
104    @classmethod
105    def from_relative_path(
106        cls: Type["HivePartition"],
107        fs: AbstractFileSystem,
108        dataset_url: str,
109        relative_path: str,
110        maximum_rows_per_fragment: int,
111        parquet_write_options: dict,
112    ) -> "HivePartition":
113        """Create a partition from a relative path"""
114        relative_path_elements = relative_path.split("/")
115        if any(map(lambda x: "=" not in x, relative_path_elements)):
116            raise ValueError(
117                f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'"
118            )
119
120        return cls(
121            fs=fs,
122            dataset_url=dataset_url,
123            partition_column_values=OrderedDict(
124                map(lambda x: x.split("=", 1), relative_path_elements)
125            ),
126            maximum_rows_per_fragment=maximum_rows_per_fragment,
127            parquet_write_options=parquet_write_options,
128        )

Create a partition from a relative path

def to_relative_path(self) -> str:
130    def to_relative_path(self) -> str:
131        """Create a relative path from partition column values"""
132        return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])

Create a relative path from partition column values

def fragment_urls(self) -> Iterable[str]:
134    def fragment_urls(self) -> Iterable[str]:
135        try:
136            return map(
137                lambda p: self.url
138                + "/"
139                + to_relative_location_from(self.scheme_prefix, self.url, p),
140                self.fs.expand_path("/".join([self.url, "*.parquet"])),
141            )
142        except FileNotFoundError:
143            return []
def fragments(self) -> Iterable[polario.hive_dataset.ParquetFragment]:
145    def fragments(self) -> Iterable[ParquetFragment]:
146        """Discover fragments"""
147        return map(
148            lambda fragment_url: ParquetFragment(
149                fragment_url,
150                self._parquet_write_options,
151            ),
152            self.fragment_urls(),
153        )

Discover fragments

def read(self) -> Optional[polars.dataframe.frame.DataFrame]:
155    def read(self) -> Optional[pl.DataFrame]:
156        """Concat the fragments in this partition into a single dataframe"""
157        fragments = [f.read() for f in self.fragments()]
158        if len(fragments) > 1:
159            # Merge schemas from different fragments into a superset schema
160            superset_schema: dict[str, pl.PolarsDataType] = reduce(
161                lambda a, b: a | dict(b.schema),
162                fragments[1:],
163                dict(fragments[0].schema),
164            )
165
166            def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame:
167                missing_columns = superset_schema.keys() - set(df.columns)
168                if missing_columns:
169                    return df.with_columns(
170                        [
171                            pl.lit(None, superset_schema[col]).alias(col)
172                            for col in missing_columns
173                        ]
174                    )
175                else:
176                    return df
177
178            complete_fragements = [
179                add_missing_columns(f).select(superset_schema.keys()) for f in fragments
180            ]
181        else:
182            complete_fragements = fragments
183
184        if complete_fragements:
185            return pl.concat(complete_fragements).with_columns(
186                map(
187                    lambda part: pl.lit(part[1]).alias(part[0]),
188                    self.partition_column_values.items(),
189                )
190            )
191        return None

Concat the fragments in this partition into a single dataframe

def scan(self) -> Optional[polars.lazyframe.frame.LazyFrame]:
193    def scan(self) -> Optional[pl.LazyFrame]:
194        """Concat the fragments in this partition into a single dataframe"""
195        fragments = [f.scan() for f in self.fragments()]
196        if fragments:
197            return pl.concat(fragments).with_columns(
198                map(
199                    lambda part: pl.lit(part[1]).alias(part[0]),
200                    self.partition_column_values.items(),
201                )
202            )
203        return None

Concat the fragments in this partition into a single dataframe

def delete(self) -> None:
217    def delete(self) -> None:
218        """Delete the partition"""
219        if self.fs.exists(self.url):
220            self.fs.delete(self.url, recursive=True)

Delete the partition

def write(self, df: polars.dataframe.frame.DataFrame) -> None:
222    def write(self, df: pl.DataFrame) -> None:
223        """Write the dataframe to this partition"""
224        self.delete()
225        self.fs.mkdir(self.url)
226        target_fragment = ParquetFragment.first_fragment(
227            self.url, self._parquet_write_options
228        )
229        self._write_to_fragments(df, target_fragment)

Write the dataframe to this partition

def append(self, df: polars.dataframe.frame.DataFrame) -> None:
231    def append(self, df: pl.DataFrame) -> None:
232        """Write the dataframe to this partition"""
233        if not self.fs.exists(self.url):
234            self.fs.mkdir(self.url)
235
236        new_fragment = ParquetFragment.first_fragment(
237            self.url, self._parquet_write_options
238        )
239        try:
240            *_, last_fragment = self.fragments()
241            new_fragment = last_fragment.next_fragment()
242        except ValueError:
243            pass
244        self._write_to_fragments(df, new_fragment)

Write the dataframe to this partition

class HiveDataset:
247class HiveDataset:
248    """Handle to multiple partitions"""
249
250    def __init__(
251        self,
252        url: str,
253        partition_columns: list[str] = [],
254        max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT,
255        parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS,
256    ) -> None:
257        self.url = url.rstrip("/")
258        # Load fsspec filesystem from url scheme
259        location = urlsplit(url)
260        self.fs = fsspec.filesystem(location.scheme)
261        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
262        self.partition_columns = partition_columns
263        self._max_rows_per_fragment = max_rows_per_fragment
264        self._parquet_write_options = parquet_write_options
265
266    def partitions(self) -> Iterable[HivePartition]:
267        """Iterate over HivePartitions"""
268        if self.partition_columns:
269            glob_pattern = HivePartition(
270                fs=self.fs,
271                dataset_url=self.url,
272                partition_column_values=OrderedDict(
273                    {k: "*" for k in self.partition_columns}
274                ),
275                maximum_rows_per_fragment=self._max_rows_per_fragment,
276                parquet_write_options=self._parquet_write_options,
277            ).to_relative_path()
278            try:
279                partitions = self.fs.expand_path(self.url + "/" + glob_pattern)
280
281                return map(
282                    lambda path: HivePartition.from_relative_path(
283                        fs=self.fs,
284                        dataset_url=self.url,
285                        relative_path=to_relative_location_from(
286                            self.scheme_prefix, self.url, path
287                        ),
288                        maximum_rows_per_fragment=self._max_rows_per_fragment,
289                        parquet_write_options=self._parquet_write_options,
290                    ),
291                    sorted(partitions),
292                )
293            except FileNotFoundError:
294                return []
295        else:
296            return [
297                HivePartition(
298                    fs=self.fs,
299                    dataset_url=self.url,
300                    partition_column_values=OrderedDict(),
301                    maximum_rows_per_fragment=self._max_rows_per_fragment,
302                    parquet_write_options=self._parquet_write_options,
303                )
304            ]
305
306    def read_partitions(self) -> Iterable[pl.DataFrame]:
307        """Iterate over partitions"""
308        for partition in self.partitions():
309            df = partition.read()
310            if df is not None:
311                yield df
312
313    def read_partition(
314        self, partition_column_values: dict[str, str]
315    ) -> Optional[pl.DataFrame]:
316        """Read the given partition from the dataset"""
317        if set(partition_column_values.keys()) != set(self.partition_columns):
318            raise ValueError(
319                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
320            )
321        return HivePartition(
322            fs=self.fs,
323            dataset_url=self.url,
324            partition_column_values=OrderedDict(partition_column_values),
325            maximum_rows_per_fragment=self._max_rows_per_fragment,
326            parquet_write_options=self._parquet_write_options,
327        ).read()
328
329    def delete_partition(self, partition_column_values: dict[str, str]) -> None:
330        """Read the given partition from the dataset"""
331        if set(partition_column_values.keys()) != set(self.partition_columns):
332            raise ValueError(
333                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
334            )
335        return HivePartition(
336            fs=self.fs,
337            dataset_url=self.url,
338            partition_column_values=OrderedDict(partition_column_values),
339            maximum_rows_per_fragment=self._max_rows_per_fragment,
340            parquet_write_options=self._parquet_write_options,
341        ).delete()
342
343    def scan_partitions(self) -> Iterable[pl.LazyFrame]:
344        """Iterate over partitions"""
345        for partition in self.partitions():
346            df = partition.scan()
347            if df is not None:
348                yield df
349
350    def scan(self) -> Optional[pl.LazyFrame]:
351        iterable = iter(self.scan_partitions())
352        first_partition = next(iterable, None)
353        if first_partition is not None:
354            return pl.concat(chain([first_partition], iterable))
355        return None
356
357    def _check_partition_columns(self, df: pl.DataFrame) -> None:
358        """Check if the given dataframe fits in this dataset"""
359        if not set(df.columns).issuperset(set(self.partition_columns)) or len(
360            df.columns
361        ) <= len(self.partition_columns):
362            raise ValueError(
363                f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}"
364            )
365        for pcol in self.partition_columns:
366            if df[pcol].dtype != pl.Utf8:
367                raise ValueError(
368                    f"Partition column {pcol} is not a string, but {df[pcol].dtype}"
369                )
370
371    def _partition_split(
372        self, df: pl.DataFrame
373    ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]:
374        """Split dataframe into partitions and partition dataframes"""
375        self._check_partition_columns(df)
376        if self.partition_columns == []:
377            yield df, HivePartition(
378                fs=self.fs,
379                dataset_url=self.url,
380                partition_column_values=OrderedDict(),
381                maximum_rows_per_fragment=self._max_rows_per_fragment,
382                parquet_write_options=self._parquet_write_options,
383            )
384        else:
385            partition_values = df.select(self.partition_columns).unique().to_dicts()
386            for partition_value in partition_values:
387                yield df.filter(
388                    reduce(
389                        lambda a, b: a & b,
390                        [pl.col(k) == v for k, v in partition_value.items()],
391                    )
392                ), HivePartition(
393                    fs=self.fs,
394                    dataset_url=self.url,
395                    partition_column_values=OrderedDict(
396                        [
397                            (pcol, partition_value[pcol])
398                            for pcol in self.partition_columns
399                        ]
400                    ),
401                    maximum_rows_per_fragment=self._max_rows_per_fragment,
402                    parquet_write_options=self._parquet_write_options,
403                )
404
405    def write(self, df: pl.DataFrame) -> None:
406        self._check_partition_columns(df)
407        for partition_df, partition in self._partition_split(df):
408            partition.write(partition_df)
409
410    def append(self, df: pl.DataFrame) -> None:
411        self._check_partition_columns(df)
412        for partition_df, partition in self._partition_split(df):
413            partition.append(partition_df)

Handle to multiple partitions

HiveDataset( url: str, partition_columns: list[str] = [], max_rows_per_fragment: int = 1000000, parquet_write_options: dict = {'use_pyarrow': True, 'compression': 'snappy'})
250    def __init__(
251        self,
252        url: str,
253        partition_columns: list[str] = [],
254        max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT,
255        parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS,
256    ) -> None:
257        self.url = url.rstrip("/")
258        # Load fsspec filesystem from url scheme
259        location = urlsplit(url)
260        self.fs = fsspec.filesystem(location.scheme)
261        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
262        self.partition_columns = partition_columns
263        self._max_rows_per_fragment = max_rows_per_fragment
264        self._parquet_write_options = parquet_write_options
def partitions(self) -> Iterable[polario.hive_dataset.HivePartition]:
266    def partitions(self) -> Iterable[HivePartition]:
267        """Iterate over HivePartitions"""
268        if self.partition_columns:
269            glob_pattern = HivePartition(
270                fs=self.fs,
271                dataset_url=self.url,
272                partition_column_values=OrderedDict(
273                    {k: "*" for k in self.partition_columns}
274                ),
275                maximum_rows_per_fragment=self._max_rows_per_fragment,
276                parquet_write_options=self._parquet_write_options,
277            ).to_relative_path()
278            try:
279                partitions = self.fs.expand_path(self.url + "/" + glob_pattern)
280
281                return map(
282                    lambda path: HivePartition.from_relative_path(
283                        fs=self.fs,
284                        dataset_url=self.url,
285                        relative_path=to_relative_location_from(
286                            self.scheme_prefix, self.url, path
287                        ),
288                        maximum_rows_per_fragment=self._max_rows_per_fragment,
289                        parquet_write_options=self._parquet_write_options,
290                    ),
291                    sorted(partitions),
292                )
293            except FileNotFoundError:
294                return []
295        else:
296            return [
297                HivePartition(
298                    fs=self.fs,
299                    dataset_url=self.url,
300                    partition_column_values=OrderedDict(),
301                    maximum_rows_per_fragment=self._max_rows_per_fragment,
302                    parquet_write_options=self._parquet_write_options,
303                )
304            ]

Iterate over HivePartitions

def read_partitions(self) -> Iterable[polars.dataframe.frame.DataFrame]:
306    def read_partitions(self) -> Iterable[pl.DataFrame]:
307        """Iterate over partitions"""
308        for partition in self.partitions():
309            df = partition.read()
310            if df is not None:
311                yield df

Iterate over partitions

def read_partition( self, partition_column_values: dict[str, str]) -> Optional[polars.dataframe.frame.DataFrame]:
313    def read_partition(
314        self, partition_column_values: dict[str, str]
315    ) -> Optional[pl.DataFrame]:
316        """Read the given partition from the dataset"""
317        if set(partition_column_values.keys()) != set(self.partition_columns):
318            raise ValueError(
319                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
320            )
321        return HivePartition(
322            fs=self.fs,
323            dataset_url=self.url,
324            partition_column_values=OrderedDict(partition_column_values),
325            maximum_rows_per_fragment=self._max_rows_per_fragment,
326            parquet_write_options=self._parquet_write_options,
327        ).read()

Read the given partition from the dataset

def delete_partition(self, partition_column_values: dict[str, str]) -> None:
329    def delete_partition(self, partition_column_values: dict[str, str]) -> None:
330        """Read the given partition from the dataset"""
331        if set(partition_column_values.keys()) != set(self.partition_columns):
332            raise ValueError(
333                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
334            )
335        return HivePartition(
336            fs=self.fs,
337            dataset_url=self.url,
338            partition_column_values=OrderedDict(partition_column_values),
339            maximum_rows_per_fragment=self._max_rows_per_fragment,
340            parquet_write_options=self._parquet_write_options,
341        ).delete()

Read the given partition from the dataset

def scan_partitions(self) -> Iterable[polars.lazyframe.frame.LazyFrame]:
343    def scan_partitions(self) -> Iterable[pl.LazyFrame]:
344        """Iterate over partitions"""
345        for partition in self.partitions():
346            df = partition.scan()
347            if df is not None:
348                yield df

Iterate over partitions

def scan(self) -> Optional[polars.lazyframe.frame.LazyFrame]:
350    def scan(self) -> Optional[pl.LazyFrame]:
351        iterable = iter(self.scan_partitions())
352        first_partition = next(iterable, None)
353        if first_partition is not None:
354            return pl.concat(chain([first_partition], iterable))
355        return None
def write(self, df: polars.dataframe.frame.DataFrame) -> None:
405    def write(self, df: pl.DataFrame) -> None:
406        self._check_partition_columns(df)
407        for partition_df, partition in self._partition_split(df):
408            partition.write(partition_df)
def append(self, df: polars.dataframe.frame.DataFrame) -> None:
410    def append(self, df: pl.DataFrame) -> None:
411        self._check_partition_columns(df)
412        for partition_df, partition in self._partition_split(df):
413            partition.append(partition_df)