polario.hive_dataset

The hive dataset implementation

  1"""The hive dataset implementation"""
  2
  3from __future__ import annotations
  4
  5from collections import OrderedDict
  6from collections.abc import Iterable
  7from functools import reduce
  8from itertools import chain
  9from typing import Optional, Tuple, Type
 10from urllib.parse import urlsplit
 11from uuid import uuid4
 12
 13import fsspec
 14import polars as pl
 15from fsspec.spec import AbstractFileSystem
 16
 17DEFAULT_ROWS_PER_FRAGMENT = int(1e6)
 18
 19DEFAULT_PARQUET_WRITE_OPTIONS = {
 20    "use_pyarrow": True,
 21    "compression": "snappy",
 22}
 23
 24
 25def to_relative_location_from(
 26    possible_prefix: str, base_location: str, location: str
 27) -> str:
 28    """Take a location and make it relative to the base location, stripping possible prefix from both"""
 29    relative_location = location
 30    if location.startswith(possible_prefix):
 31        relative_location = relative_location[len(possible_prefix) :]
 32
 33    # If base location is not absolute, it might be somewhere in location
 34    if not base_location.startswith("/") and base_location in relative_location:
 35        relative_location = relative_location[relative_location.find(base_location) :]
 36
 37    relative_location = relative_location.lstrip("/")
 38    scheme_less_url = base_location[len(possible_prefix) :].lstrip("/")
 39    if relative_location.startswith(scheme_less_url):
 40        relative_location = relative_location[len(scheme_less_url) + 1 :]
 41    return relative_location
 42
 43
 44class ParquetFragment:
 45    """Pointer to a parquet fragment"""
 46
 47    def __init__(self, url: str, parquet_write_options: dict) -> None:
 48        self.url = url
 49        self.parquet_write_options = parquet_write_options
 50
 51    @classmethod
 52    def first_fragment(
 53        cls: Type[ParquetFragment],
 54        partition_base_url: str,
 55        parquet_write_options: dict,
 56    ) -> ParquetFragment:
 57        """Return a fragment name for what should be the first fragment in the partition"""
 58        idx = 0
 59        return cls(
 60            f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet",
 61            parquet_write_options,
 62        )
 63
 64    def next_fragment(self) -> ParquetFragment:
 65        """Return a fragment name for what should be the next fragment in the partition"""
 66        idx = int(self.url.split("/")[-1].split("_")[0]) + 1
 67        return ParquetFragment(
 68            f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet",
 69            self.parquet_write_options,
 70        )
 71
 72    def read(self) -> pl.DataFrame:
 73        """Read the fragment"""
 74        return pl.read_parquet(self.url)
 75
 76    def scan(self) -> pl.LazyFrame:
 77        """Read the fragment"""
 78        return pl.scan_parquet(self.url)
 79
 80    def write(self, df: pl.DataFrame) -> None:
 81        """Write the fragment"""
 82        df.write_parquet(self.url, **self.parquet_write_options)
 83
 84
 85class HivePartition:
 86    """Pointer to a partition in a HiveDataset"""
 87
 88    def __init__(
 89        self,
 90        fs: AbstractFileSystem,
 91        dataset_url: str,
 92        partition_column_values: OrderedDict[str, str],
 93        maximum_rows_per_fragment: int,
 94        parquet_write_options: dict,
 95    ) -> None:
 96        self.fs = fs
 97        self.partition_column_values = partition_column_values
 98        self.maximum_rows_per_fragment = maximum_rows_per_fragment
 99        self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/")
100        location = urlsplit(dataset_url)
101        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
102        self._parquet_write_options = parquet_write_options
103
104    @classmethod
105    def from_relative_path(
106        cls: Type[HivePartition],
107        fs: AbstractFileSystem,
108        dataset_url: str,
109        relative_path: str,
110        maximum_rows_per_fragment: int,
111        parquet_write_options: dict,
112    ) -> HivePartition:
113        """Create a partition from a relative path"""
114        relative_path_elements = relative_path.split("/")
115        if any(map(lambda x: "=" not in x, relative_path_elements)):
116            raise ValueError(
117                f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'"
118            )
119
120        return cls(
121            fs=fs,
122            dataset_url=dataset_url,
123            partition_column_values=OrderedDict(
124                map(lambda x: x.split("=", 1), relative_path_elements)
125            ),
126            maximum_rows_per_fragment=maximum_rows_per_fragment,
127            parquet_write_options=parquet_write_options,
128        )
129
130    def to_relative_path(self) -> str:
131        """Create a relative path from partition column values"""
132        return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])
133
134    def fragment_urls(self) -> Iterable[str]:
135        try:
136            return map(
137                lambda p: self.url
138                + "/"
139                + to_relative_location_from(self.scheme_prefix, self.url, p),
140                self.fs.expand_path("/".join([self.url, "*.parquet"])),
141            )
142        except FileNotFoundError:
143            return []
144
145    def fragments(self) -> Iterable[ParquetFragment]:
146        """Discover fragments"""
147        return map(
148            lambda fragment_url: ParquetFragment(
149                fragment_url,
150                self._parquet_write_options,
151            ),
152            self.fragment_urls(),
153        )
154
155    def read(self) -> Optional[pl.DataFrame]:
156        """Concat the fragments in this partition into a single dataframe"""
157        fragments = [f.read() for f in self.fragments()]
158        if len(fragments) > 1:
159            # Merge schemas from different fragments into a superset schema
160            superset_schema: dict[str, pl.PolarsDataType] = reduce(
161                lambda a, b: a | dict(b.schema),
162                fragments[1:],
163                dict(fragments[0].schema),
164            )
165
166            def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame:
167                missing_columns = superset_schema.keys() - set(df.columns)
168                if missing_columns:
169                    return df.with_columns(
170                        [
171                            pl.lit(None, superset_schema[col]).alias(col)
172                            for col in missing_columns
173                        ]
174                    )
175                else:
176                    return df
177
178            complete_fragements = [
179                add_missing_columns(f).select_seq(superset_schema.keys())
180                for f in fragments
181            ]
182        else:
183            complete_fragements = fragments
184
185        if complete_fragements:
186            return pl.concat(complete_fragements).with_columns(
187                map(
188                    lambda part: pl.lit(part[1]).alias(part[0]),
189                    self.partition_column_values.items(),
190                )
191            )
192        return None
193
194    def scan(self) -> Optional[pl.LazyFrame]:
195        """Concat the fragments in this partition into a single dataframe"""
196        fragments = [f.scan() for f in self.fragments()]
197        if fragments:
198            return pl.concat(fragments).with_columns(
199                map(
200                    lambda part: pl.lit(part[1]).alias(part[0]),
201                    self.partition_column_values.items(),
202                )
203            )
204        return None
205
206    def _write_to_fragments(
207        self, df: pl.DataFrame, target_fragment: ParquetFragment
208    ) -> None:
209        output_columns = list(
210            sorted(set(df.columns) - self.partition_column_values.keys())
211        )
212        for fragment_df in df.select_seq(output_columns).iter_slices(
213            self.maximum_rows_per_fragment
214        ):
215            target_fragment.write(fragment_df)
216            target_fragment = target_fragment.next_fragment()
217
218    def delete(self) -> None:
219        """Delete the partition"""
220        if self.fs.exists(self.url):
221            self.fs.delete(self.url, recursive=True)
222
223    def write(self, df: pl.DataFrame) -> None:
224        """Write the dataframe to this partition"""
225        self.delete()
226        self.fs.mkdir(self.url)
227        target_fragment = ParquetFragment.first_fragment(
228            self.url, self._parquet_write_options
229        )
230        self._write_to_fragments(df, target_fragment)
231
232    def append(self, df: pl.DataFrame) -> None:
233        """Write the dataframe to this partition"""
234        if not self.fs.exists(self.url):
235            self.fs.mkdir(self.url)
236
237        new_fragment = ParquetFragment.first_fragment(
238            self.url, self._parquet_write_options
239        )
240        try:
241            *_, last_fragment = self.fragments()
242            new_fragment = last_fragment.next_fragment()
243        except ValueError:
244            pass
245        self._write_to_fragments(df, new_fragment)
246
247
248class HiveDataset:
249    """Handle to multiple partitions"""
250
251    def __init__(
252        self,
253        url: str,
254        partition_columns: list[str] | None = None,
255        max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT,
256        parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS,
257    ) -> None:
258        self.url = url.rstrip("/")
259        # Load fsspec filesystem from url scheme
260        location = urlsplit(url)
261        self.fs = fsspec.filesystem(location.scheme)
262        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
263        self.partition_columns = partition_columns or []
264        self._max_rows_per_fragment = max_rows_per_fragment
265        self._parquet_write_options = parquet_write_options
266
267    def partitions(self) -> Iterable[HivePartition]:
268        """Iterate over HivePartitions"""
269        if self.partition_columns:
270            glob_pattern = HivePartition(
271                fs=self.fs,
272                dataset_url=self.url,
273                partition_column_values=OrderedDict(
274                    {k: "*" for k in self.partition_columns}
275                ),
276                maximum_rows_per_fragment=self._max_rows_per_fragment,
277                parquet_write_options=self._parquet_write_options,
278            ).to_relative_path()
279            try:
280                partitions = self.fs.expand_path(self.url + "/" + glob_pattern)
281
282                return map(
283                    lambda path: HivePartition.from_relative_path(
284                        fs=self.fs,
285                        dataset_url=self.url,
286                        relative_path=to_relative_location_from(
287                            self.scheme_prefix, self.url, path
288                        ),
289                        maximum_rows_per_fragment=self._max_rows_per_fragment,
290                        parquet_write_options=self._parquet_write_options,
291                    ),
292                    sorted(partitions),
293                )
294            except FileNotFoundError:
295                return []
296        else:
297            return [
298                HivePartition(
299                    fs=self.fs,
300                    dataset_url=self.url,
301                    partition_column_values=OrderedDict(),
302                    maximum_rows_per_fragment=self._max_rows_per_fragment,
303                    parquet_write_options=self._parquet_write_options,
304                )
305            ]
306
307    def read_partitions(self) -> Iterable[pl.DataFrame]:
308        """Iterate over partitions"""
309        for partition in self.partitions():
310            df = partition.read()
311            if df is not None:
312                yield df
313
314    def read_partition(
315        self, partition_column_values: dict[str, str]
316    ) -> Optional[pl.DataFrame]:
317        """Read the given partition from the dataset"""
318        if set(partition_column_values.keys()) != set(self.partition_columns):
319            raise ValueError(
320                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
321            )
322        return HivePartition(
323            fs=self.fs,
324            dataset_url=self.url,
325            partition_column_values=OrderedDict(partition_column_values),
326            maximum_rows_per_fragment=self._max_rows_per_fragment,
327            parquet_write_options=self._parquet_write_options,
328        ).read()
329
330    def delete_partition(self, partition_column_values: dict[str, str]) -> None:
331        """Read the given partition from the dataset"""
332        if set(partition_column_values.keys()) != set(self.partition_columns):
333            raise ValueError(
334                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
335            )
336        return HivePartition(
337            fs=self.fs,
338            dataset_url=self.url,
339            partition_column_values=OrderedDict(partition_column_values),
340            maximum_rows_per_fragment=self._max_rows_per_fragment,
341            parquet_write_options=self._parquet_write_options,
342        ).delete()
343
344    def scan_partitions(self) -> Iterable[pl.LazyFrame]:
345        """Iterate over partitions"""
346        for partition in self.partitions():
347            df = partition.scan()
348            if df is not None:
349                yield df
350
351    def scan(self) -> Optional[pl.LazyFrame]:
352        iterable = iter(self.scan_partitions())
353        first_partition = next(iterable, None)
354        if first_partition is not None:
355            return pl.concat(chain([first_partition], iterable))
356        return None
357
358    def _check_partition_columns(self, df: pl.DataFrame) -> None:
359        """Check if the given dataframe fits in this dataset"""
360        if not set(df.columns).issuperset(set(self.partition_columns)) or len(
361            df.columns
362        ) <= len(self.partition_columns):
363            raise ValueError(
364                f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}"
365            )
366        for pcol in self.partition_columns:
367            if df[pcol].dtype != pl.Utf8:
368                raise ValueError(
369                    f"Partition column {pcol} is not a string, but {df[pcol].dtype}"
370                )
371
372    def _partition_split(
373        self, df: pl.DataFrame
374    ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]:
375        """Split dataframe into partitions and partition dataframes"""
376        self._check_partition_columns(df)
377        if self.partition_columns == []:
378            yield (
379                df,
380                HivePartition(
381                    fs=self.fs,
382                    dataset_url=self.url,
383                    partition_column_values=OrderedDict(),
384                    maximum_rows_per_fragment=self._max_rows_per_fragment,
385                    parquet_write_options=self._parquet_write_options,
386                ),
387            )
388        else:
389            partition_values = df.select_seq(self.partition_columns).unique().to_dicts()
390            for partition_value in partition_values:
391                yield (
392                    df.filter(
393                        reduce(
394                            lambda a, b: a & b,
395                            [pl.col(k) == v for k, v in partition_value.items()],
396                        )
397                    ),
398                    HivePartition(
399                        fs=self.fs,
400                        dataset_url=self.url,
401                        partition_column_values=OrderedDict(
402                            [
403                                (pcol, partition_value[pcol])
404                                for pcol in self.partition_columns
405                            ]
406                        ),
407                        maximum_rows_per_fragment=self._max_rows_per_fragment,
408                        parquet_write_options=self._parquet_write_options,
409                    ),
410                )
411
412    def write(self, df: pl.DataFrame) -> None:
413        self._check_partition_columns(df)
414        for partition_df, partition in self._partition_split(df):
415            partition.write(partition_df)
416
417    def append(self, df: pl.DataFrame) -> None:
418        self._check_partition_columns(df)
419        for partition_df, partition in self._partition_split(df):
420            partition.append(partition_df)
DEFAULT_ROWS_PER_FRAGMENT = 1000000
DEFAULT_PARQUET_WRITE_OPTIONS = {'use_pyarrow': True, 'compression': 'snappy'}
def to_relative_location_from(possible_prefix: str, base_location: str, location: str) -> str:
26def to_relative_location_from(
27    possible_prefix: str, base_location: str, location: str
28) -> str:
29    """Take a location and make it relative to the base location, stripping possible prefix from both"""
30    relative_location = location
31    if location.startswith(possible_prefix):
32        relative_location = relative_location[len(possible_prefix) :]
33
34    # If base location is not absolute, it might be somewhere in location
35    if not base_location.startswith("/") and base_location in relative_location:
36        relative_location = relative_location[relative_location.find(base_location) :]
37
38    relative_location = relative_location.lstrip("/")
39    scheme_less_url = base_location[len(possible_prefix) :].lstrip("/")
40    if relative_location.startswith(scheme_less_url):
41        relative_location = relative_location[len(scheme_less_url) + 1 :]
42    return relative_location

Take a location and make it relative to the base location, stripping possible prefix from both

class ParquetFragment:
45class ParquetFragment:
46    """Pointer to a parquet fragment"""
47
48    def __init__(self, url: str, parquet_write_options: dict) -> None:
49        self.url = url
50        self.parquet_write_options = parquet_write_options
51
52    @classmethod
53    def first_fragment(
54        cls: Type[ParquetFragment],
55        partition_base_url: str,
56        parquet_write_options: dict,
57    ) -> ParquetFragment:
58        """Return a fragment name for what should be the first fragment in the partition"""
59        idx = 0
60        return cls(
61            f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet",
62            parquet_write_options,
63        )
64
65    def next_fragment(self) -> ParquetFragment:
66        """Return a fragment name for what should be the next fragment in the partition"""
67        idx = int(self.url.split("/")[-1].split("_")[0]) + 1
68        return ParquetFragment(
69            f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet",
70            self.parquet_write_options,
71        )
72
73    def read(self) -> pl.DataFrame:
74        """Read the fragment"""
75        return pl.read_parquet(self.url)
76
77    def scan(self) -> pl.LazyFrame:
78        """Read the fragment"""
79        return pl.scan_parquet(self.url)
80
81    def write(self, df: pl.DataFrame) -> None:
82        """Write the fragment"""
83        df.write_parquet(self.url, **self.parquet_write_options)

Pointer to a parquet fragment

ParquetFragment(url: str, parquet_write_options: dict)
48    def __init__(self, url: str, parquet_write_options: dict) -> None:
49        self.url = url
50        self.parquet_write_options = parquet_write_options
url
parquet_write_options
@classmethod
def first_fragment( cls: Type[ParquetFragment], partition_base_url: str, parquet_write_options: dict) -> ParquetFragment:
52    @classmethod
53    def first_fragment(
54        cls: Type[ParquetFragment],
55        partition_base_url: str,
56        parquet_write_options: dict,
57    ) -> ParquetFragment:
58        """Return a fragment name for what should be the first fragment in the partition"""
59        idx = 0
60        return cls(
61            f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet",
62            parquet_write_options,
63        )

Return a fragment name for what should be the first fragment in the partition

def next_fragment(self) -> ParquetFragment:
65    def next_fragment(self) -> ParquetFragment:
66        """Return a fragment name for what should be the next fragment in the partition"""
67        idx = int(self.url.split("/")[-1].split("_")[0]) + 1
68        return ParquetFragment(
69            f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet",
70            self.parquet_write_options,
71        )

Return a fragment name for what should be the next fragment in the partition

def read(self) -> polars.dataframe.frame.DataFrame:
73    def read(self) -> pl.DataFrame:
74        """Read the fragment"""
75        return pl.read_parquet(self.url)

Read the fragment

def scan(self) -> polars.lazyframe.frame.LazyFrame:
77    def scan(self) -> pl.LazyFrame:
78        """Read the fragment"""
79        return pl.scan_parquet(self.url)

Read the fragment

def write(self, df: polars.dataframe.frame.DataFrame) -> None:
81    def write(self, df: pl.DataFrame) -> None:
82        """Write the fragment"""
83        df.write_parquet(self.url, **self.parquet_write_options)

Write the fragment

class HivePartition:
 86class HivePartition:
 87    """Pointer to a partition in a HiveDataset"""
 88
 89    def __init__(
 90        self,
 91        fs: AbstractFileSystem,
 92        dataset_url: str,
 93        partition_column_values: OrderedDict[str, str],
 94        maximum_rows_per_fragment: int,
 95        parquet_write_options: dict,
 96    ) -> None:
 97        self.fs = fs
 98        self.partition_column_values = partition_column_values
 99        self.maximum_rows_per_fragment = maximum_rows_per_fragment
100        self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/")
101        location = urlsplit(dataset_url)
102        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
103        self._parquet_write_options = parquet_write_options
104
105    @classmethod
106    def from_relative_path(
107        cls: Type[HivePartition],
108        fs: AbstractFileSystem,
109        dataset_url: str,
110        relative_path: str,
111        maximum_rows_per_fragment: int,
112        parquet_write_options: dict,
113    ) -> HivePartition:
114        """Create a partition from a relative path"""
115        relative_path_elements = relative_path.split("/")
116        if any(map(lambda x: "=" not in x, relative_path_elements)):
117            raise ValueError(
118                f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'"
119            )
120
121        return cls(
122            fs=fs,
123            dataset_url=dataset_url,
124            partition_column_values=OrderedDict(
125                map(lambda x: x.split("=", 1), relative_path_elements)
126            ),
127            maximum_rows_per_fragment=maximum_rows_per_fragment,
128            parquet_write_options=parquet_write_options,
129        )
130
131    def to_relative_path(self) -> str:
132        """Create a relative path from partition column values"""
133        return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])
134
135    def fragment_urls(self) -> Iterable[str]:
136        try:
137            return map(
138                lambda p: self.url
139                + "/"
140                + to_relative_location_from(self.scheme_prefix, self.url, p),
141                self.fs.expand_path("/".join([self.url, "*.parquet"])),
142            )
143        except FileNotFoundError:
144            return []
145
146    def fragments(self) -> Iterable[ParquetFragment]:
147        """Discover fragments"""
148        return map(
149            lambda fragment_url: ParquetFragment(
150                fragment_url,
151                self._parquet_write_options,
152            ),
153            self.fragment_urls(),
154        )
155
156    def read(self) -> Optional[pl.DataFrame]:
157        """Concat the fragments in this partition into a single dataframe"""
158        fragments = [f.read() for f in self.fragments()]
159        if len(fragments) > 1:
160            # Merge schemas from different fragments into a superset schema
161            superset_schema: dict[str, pl.PolarsDataType] = reduce(
162                lambda a, b: a | dict(b.schema),
163                fragments[1:],
164                dict(fragments[0].schema),
165            )
166
167            def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame:
168                missing_columns = superset_schema.keys() - set(df.columns)
169                if missing_columns:
170                    return df.with_columns(
171                        [
172                            pl.lit(None, superset_schema[col]).alias(col)
173                            for col in missing_columns
174                        ]
175                    )
176                else:
177                    return df
178
179            complete_fragements = [
180                add_missing_columns(f).select_seq(superset_schema.keys())
181                for f in fragments
182            ]
183        else:
184            complete_fragements = fragments
185
186        if complete_fragements:
187            return pl.concat(complete_fragements).with_columns(
188                map(
189                    lambda part: pl.lit(part[1]).alias(part[0]),
190                    self.partition_column_values.items(),
191                )
192            )
193        return None
194
195    def scan(self) -> Optional[pl.LazyFrame]:
196        """Concat the fragments in this partition into a single dataframe"""
197        fragments = [f.scan() for f in self.fragments()]
198        if fragments:
199            return pl.concat(fragments).with_columns(
200                map(
201                    lambda part: pl.lit(part[1]).alias(part[0]),
202                    self.partition_column_values.items(),
203                )
204            )
205        return None
206
207    def _write_to_fragments(
208        self, df: pl.DataFrame, target_fragment: ParquetFragment
209    ) -> None:
210        output_columns = list(
211            sorted(set(df.columns) - self.partition_column_values.keys())
212        )
213        for fragment_df in df.select_seq(output_columns).iter_slices(
214            self.maximum_rows_per_fragment
215        ):
216            target_fragment.write(fragment_df)
217            target_fragment = target_fragment.next_fragment()
218
219    def delete(self) -> None:
220        """Delete the partition"""
221        if self.fs.exists(self.url):
222            self.fs.delete(self.url, recursive=True)
223
224    def write(self, df: pl.DataFrame) -> None:
225        """Write the dataframe to this partition"""
226        self.delete()
227        self.fs.mkdir(self.url)
228        target_fragment = ParquetFragment.first_fragment(
229            self.url, self._parquet_write_options
230        )
231        self._write_to_fragments(df, target_fragment)
232
233    def append(self, df: pl.DataFrame) -> None:
234        """Write the dataframe to this partition"""
235        if not self.fs.exists(self.url):
236            self.fs.mkdir(self.url)
237
238        new_fragment = ParquetFragment.first_fragment(
239            self.url, self._parquet_write_options
240        )
241        try:
242            *_, last_fragment = self.fragments()
243            new_fragment = last_fragment.next_fragment()
244        except ValueError:
245            pass
246        self._write_to_fragments(df, new_fragment)

Pointer to a partition in a HiveDataset

HivePartition( fs: fsspec.spec.AbstractFileSystem, dataset_url: str, partition_column_values: collections.OrderedDict[str, str], maximum_rows_per_fragment: int, parquet_write_options: dict)
 89    def __init__(
 90        self,
 91        fs: AbstractFileSystem,
 92        dataset_url: str,
 93        partition_column_values: OrderedDict[str, str],
 94        maximum_rows_per_fragment: int,
 95        parquet_write_options: dict,
 96    ) -> None:
 97        self.fs = fs
 98        self.partition_column_values = partition_column_values
 99        self.maximum_rows_per_fragment = maximum_rows_per_fragment
100        self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/")
101        location = urlsplit(dataset_url)
102        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
103        self._parquet_write_options = parquet_write_options
fs
partition_column_values
maximum_rows_per_fragment
url
scheme_prefix
@classmethod
def from_relative_path( cls: Type[HivePartition], fs: fsspec.spec.AbstractFileSystem, dataset_url: str, relative_path: str, maximum_rows_per_fragment: int, parquet_write_options: dict) -> HivePartition:
105    @classmethod
106    def from_relative_path(
107        cls: Type[HivePartition],
108        fs: AbstractFileSystem,
109        dataset_url: str,
110        relative_path: str,
111        maximum_rows_per_fragment: int,
112        parquet_write_options: dict,
113    ) -> HivePartition:
114        """Create a partition from a relative path"""
115        relative_path_elements = relative_path.split("/")
116        if any(map(lambda x: "=" not in x, relative_path_elements)):
117            raise ValueError(
118                f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'"
119            )
120
121        return cls(
122            fs=fs,
123            dataset_url=dataset_url,
124            partition_column_values=OrderedDict(
125                map(lambda x: x.split("=", 1), relative_path_elements)
126            ),
127            maximum_rows_per_fragment=maximum_rows_per_fragment,
128            parquet_write_options=parquet_write_options,
129        )

Create a partition from a relative path

def to_relative_path(self) -> str:
131    def to_relative_path(self) -> str:
132        """Create a relative path from partition column values"""
133        return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])

Create a relative path from partition column values

def fragment_urls(self) -> Iterable[str]:
135    def fragment_urls(self) -> Iterable[str]:
136        try:
137            return map(
138                lambda p: self.url
139                + "/"
140                + to_relative_location_from(self.scheme_prefix, self.url, p),
141                self.fs.expand_path("/".join([self.url, "*.parquet"])),
142            )
143        except FileNotFoundError:
144            return []
def fragments(self) -> Iterable[ParquetFragment]:
146    def fragments(self) -> Iterable[ParquetFragment]:
147        """Discover fragments"""
148        return map(
149            lambda fragment_url: ParquetFragment(
150                fragment_url,
151                self._parquet_write_options,
152            ),
153            self.fragment_urls(),
154        )

Discover fragments

def read(self) -> Optional[polars.dataframe.frame.DataFrame]:
156    def read(self) -> Optional[pl.DataFrame]:
157        """Concat the fragments in this partition into a single dataframe"""
158        fragments = [f.read() for f in self.fragments()]
159        if len(fragments) > 1:
160            # Merge schemas from different fragments into a superset schema
161            superset_schema: dict[str, pl.PolarsDataType] = reduce(
162                lambda a, b: a | dict(b.schema),
163                fragments[1:],
164                dict(fragments[0].schema),
165            )
166
167            def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame:
168                missing_columns = superset_schema.keys() - set(df.columns)
169                if missing_columns:
170                    return df.with_columns(
171                        [
172                            pl.lit(None, superset_schema[col]).alias(col)
173                            for col in missing_columns
174                        ]
175                    )
176                else:
177                    return df
178
179            complete_fragements = [
180                add_missing_columns(f).select_seq(superset_schema.keys())
181                for f in fragments
182            ]
183        else:
184            complete_fragements = fragments
185
186        if complete_fragements:
187            return pl.concat(complete_fragements).with_columns(
188                map(
189                    lambda part: pl.lit(part[1]).alias(part[0]),
190                    self.partition_column_values.items(),
191                )
192            )
193        return None

Concat the fragments in this partition into a single dataframe

def scan(self) -> Optional[polars.lazyframe.frame.LazyFrame]:
195    def scan(self) -> Optional[pl.LazyFrame]:
196        """Concat the fragments in this partition into a single dataframe"""
197        fragments = [f.scan() for f in self.fragments()]
198        if fragments:
199            return pl.concat(fragments).with_columns(
200                map(
201                    lambda part: pl.lit(part[1]).alias(part[0]),
202                    self.partition_column_values.items(),
203                )
204            )
205        return None

Concat the fragments in this partition into a single dataframe

def delete(self) -> None:
219    def delete(self) -> None:
220        """Delete the partition"""
221        if self.fs.exists(self.url):
222            self.fs.delete(self.url, recursive=True)

Delete the partition

def write(self, df: polars.dataframe.frame.DataFrame) -> None:
224    def write(self, df: pl.DataFrame) -> None:
225        """Write the dataframe to this partition"""
226        self.delete()
227        self.fs.mkdir(self.url)
228        target_fragment = ParquetFragment.first_fragment(
229            self.url, self._parquet_write_options
230        )
231        self._write_to_fragments(df, target_fragment)

Write the dataframe to this partition

def append(self, df: polars.dataframe.frame.DataFrame) -> None:
233    def append(self, df: pl.DataFrame) -> None:
234        """Write the dataframe to this partition"""
235        if not self.fs.exists(self.url):
236            self.fs.mkdir(self.url)
237
238        new_fragment = ParquetFragment.first_fragment(
239            self.url, self._parquet_write_options
240        )
241        try:
242            *_, last_fragment = self.fragments()
243            new_fragment = last_fragment.next_fragment()
244        except ValueError:
245            pass
246        self._write_to_fragments(df, new_fragment)

Write the dataframe to this partition

class HiveDataset:
249class HiveDataset:
250    """Handle to multiple partitions"""
251
252    def __init__(
253        self,
254        url: str,
255        partition_columns: list[str] | None = None,
256        max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT,
257        parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS,
258    ) -> None:
259        self.url = url.rstrip("/")
260        # Load fsspec filesystem from url scheme
261        location = urlsplit(url)
262        self.fs = fsspec.filesystem(location.scheme)
263        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
264        self.partition_columns = partition_columns or []
265        self._max_rows_per_fragment = max_rows_per_fragment
266        self._parquet_write_options = parquet_write_options
267
268    def partitions(self) -> Iterable[HivePartition]:
269        """Iterate over HivePartitions"""
270        if self.partition_columns:
271            glob_pattern = HivePartition(
272                fs=self.fs,
273                dataset_url=self.url,
274                partition_column_values=OrderedDict(
275                    {k: "*" for k in self.partition_columns}
276                ),
277                maximum_rows_per_fragment=self._max_rows_per_fragment,
278                parquet_write_options=self._parquet_write_options,
279            ).to_relative_path()
280            try:
281                partitions = self.fs.expand_path(self.url + "/" + glob_pattern)
282
283                return map(
284                    lambda path: HivePartition.from_relative_path(
285                        fs=self.fs,
286                        dataset_url=self.url,
287                        relative_path=to_relative_location_from(
288                            self.scheme_prefix, self.url, path
289                        ),
290                        maximum_rows_per_fragment=self._max_rows_per_fragment,
291                        parquet_write_options=self._parquet_write_options,
292                    ),
293                    sorted(partitions),
294                )
295            except FileNotFoundError:
296                return []
297        else:
298            return [
299                HivePartition(
300                    fs=self.fs,
301                    dataset_url=self.url,
302                    partition_column_values=OrderedDict(),
303                    maximum_rows_per_fragment=self._max_rows_per_fragment,
304                    parquet_write_options=self._parquet_write_options,
305                )
306            ]
307
308    def read_partitions(self) -> Iterable[pl.DataFrame]:
309        """Iterate over partitions"""
310        for partition in self.partitions():
311            df = partition.read()
312            if df is not None:
313                yield df
314
315    def read_partition(
316        self, partition_column_values: dict[str, str]
317    ) -> Optional[pl.DataFrame]:
318        """Read the given partition from the dataset"""
319        if set(partition_column_values.keys()) != set(self.partition_columns):
320            raise ValueError(
321                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
322            )
323        return HivePartition(
324            fs=self.fs,
325            dataset_url=self.url,
326            partition_column_values=OrderedDict(partition_column_values),
327            maximum_rows_per_fragment=self._max_rows_per_fragment,
328            parquet_write_options=self._parquet_write_options,
329        ).read()
330
331    def delete_partition(self, partition_column_values: dict[str, str]) -> None:
332        """Read the given partition from the dataset"""
333        if set(partition_column_values.keys()) != set(self.partition_columns):
334            raise ValueError(
335                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
336            )
337        return HivePartition(
338            fs=self.fs,
339            dataset_url=self.url,
340            partition_column_values=OrderedDict(partition_column_values),
341            maximum_rows_per_fragment=self._max_rows_per_fragment,
342            parquet_write_options=self._parquet_write_options,
343        ).delete()
344
345    def scan_partitions(self) -> Iterable[pl.LazyFrame]:
346        """Iterate over partitions"""
347        for partition in self.partitions():
348            df = partition.scan()
349            if df is not None:
350                yield df
351
352    def scan(self) -> Optional[pl.LazyFrame]:
353        iterable = iter(self.scan_partitions())
354        first_partition = next(iterable, None)
355        if first_partition is not None:
356            return pl.concat(chain([first_partition], iterable))
357        return None
358
359    def _check_partition_columns(self, df: pl.DataFrame) -> None:
360        """Check if the given dataframe fits in this dataset"""
361        if not set(df.columns).issuperset(set(self.partition_columns)) or len(
362            df.columns
363        ) <= len(self.partition_columns):
364            raise ValueError(
365                f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}"
366            )
367        for pcol in self.partition_columns:
368            if df[pcol].dtype != pl.Utf8:
369                raise ValueError(
370                    f"Partition column {pcol} is not a string, but {df[pcol].dtype}"
371                )
372
373    def _partition_split(
374        self, df: pl.DataFrame
375    ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]:
376        """Split dataframe into partitions and partition dataframes"""
377        self._check_partition_columns(df)
378        if self.partition_columns == []:
379            yield (
380                df,
381                HivePartition(
382                    fs=self.fs,
383                    dataset_url=self.url,
384                    partition_column_values=OrderedDict(),
385                    maximum_rows_per_fragment=self._max_rows_per_fragment,
386                    parquet_write_options=self._parquet_write_options,
387                ),
388            )
389        else:
390            partition_values = df.select_seq(self.partition_columns).unique().to_dicts()
391            for partition_value in partition_values:
392                yield (
393                    df.filter(
394                        reduce(
395                            lambda a, b: a & b,
396                            [pl.col(k) == v for k, v in partition_value.items()],
397                        )
398                    ),
399                    HivePartition(
400                        fs=self.fs,
401                        dataset_url=self.url,
402                        partition_column_values=OrderedDict(
403                            [
404                                (pcol, partition_value[pcol])
405                                for pcol in self.partition_columns
406                            ]
407                        ),
408                        maximum_rows_per_fragment=self._max_rows_per_fragment,
409                        parquet_write_options=self._parquet_write_options,
410                    ),
411                )
412
413    def write(self, df: pl.DataFrame) -> None:
414        self._check_partition_columns(df)
415        for partition_df, partition in self._partition_split(df):
416            partition.write(partition_df)
417
418    def append(self, df: pl.DataFrame) -> None:
419        self._check_partition_columns(df)
420        for partition_df, partition in self._partition_split(df):
421            partition.append(partition_df)

Handle to multiple partitions

HiveDataset( url: str, partition_columns: list[str] | None = None, max_rows_per_fragment: int = 1000000, parquet_write_options: dict = {'use_pyarrow': True, 'compression': 'snappy'})
252    def __init__(
253        self,
254        url: str,
255        partition_columns: list[str] | None = None,
256        max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT,
257        parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS,
258    ) -> None:
259        self.url = url.rstrip("/")
260        # Load fsspec filesystem from url scheme
261        location = urlsplit(url)
262        self.fs = fsspec.filesystem(location.scheme)
263        self.scheme_prefix = location.scheme + "://" if location.scheme else ""
264        self.partition_columns = partition_columns or []
265        self._max_rows_per_fragment = max_rows_per_fragment
266        self._parquet_write_options = parquet_write_options
url
fs
scheme_prefix
partition_columns
def partitions(self) -> Iterable[HivePartition]:
268    def partitions(self) -> Iterable[HivePartition]:
269        """Iterate over HivePartitions"""
270        if self.partition_columns:
271            glob_pattern = HivePartition(
272                fs=self.fs,
273                dataset_url=self.url,
274                partition_column_values=OrderedDict(
275                    {k: "*" for k in self.partition_columns}
276                ),
277                maximum_rows_per_fragment=self._max_rows_per_fragment,
278                parquet_write_options=self._parquet_write_options,
279            ).to_relative_path()
280            try:
281                partitions = self.fs.expand_path(self.url + "/" + glob_pattern)
282
283                return map(
284                    lambda path: HivePartition.from_relative_path(
285                        fs=self.fs,
286                        dataset_url=self.url,
287                        relative_path=to_relative_location_from(
288                            self.scheme_prefix, self.url, path
289                        ),
290                        maximum_rows_per_fragment=self._max_rows_per_fragment,
291                        parquet_write_options=self._parquet_write_options,
292                    ),
293                    sorted(partitions),
294                )
295            except FileNotFoundError:
296                return []
297        else:
298            return [
299                HivePartition(
300                    fs=self.fs,
301                    dataset_url=self.url,
302                    partition_column_values=OrderedDict(),
303                    maximum_rows_per_fragment=self._max_rows_per_fragment,
304                    parquet_write_options=self._parquet_write_options,
305                )
306            ]

Iterate over HivePartitions

def read_partitions(self) -> Iterable[polars.dataframe.frame.DataFrame]:
308    def read_partitions(self) -> Iterable[pl.DataFrame]:
309        """Iterate over partitions"""
310        for partition in self.partitions():
311            df = partition.read()
312            if df is not None:
313                yield df

Iterate over partitions

def read_partition( self, partition_column_values: dict[str, str]) -> Optional[polars.dataframe.frame.DataFrame]:
315    def read_partition(
316        self, partition_column_values: dict[str, str]
317    ) -> Optional[pl.DataFrame]:
318        """Read the given partition from the dataset"""
319        if set(partition_column_values.keys()) != set(self.partition_columns):
320            raise ValueError(
321                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
322            )
323        return HivePartition(
324            fs=self.fs,
325            dataset_url=self.url,
326            partition_column_values=OrderedDict(partition_column_values),
327            maximum_rows_per_fragment=self._max_rows_per_fragment,
328            parquet_write_options=self._parquet_write_options,
329        ).read()

Read the given partition from the dataset

def delete_partition(self, partition_column_values: dict[str, str]) -> None:
331    def delete_partition(self, partition_column_values: dict[str, str]) -> None:
332        """Read the given partition from the dataset"""
333        if set(partition_column_values.keys()) != set(self.partition_columns):
334            raise ValueError(
335                f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}"
336            )
337        return HivePartition(
338            fs=self.fs,
339            dataset_url=self.url,
340            partition_column_values=OrderedDict(partition_column_values),
341            maximum_rows_per_fragment=self._max_rows_per_fragment,
342            parquet_write_options=self._parquet_write_options,
343        ).delete()

Read the given partition from the dataset

def scan_partitions(self) -> Iterable[polars.lazyframe.frame.LazyFrame]:
345    def scan_partitions(self) -> Iterable[pl.LazyFrame]:
346        """Iterate over partitions"""
347        for partition in self.partitions():
348            df = partition.scan()
349            if df is not None:
350                yield df

Iterate over partitions

def scan(self) -> Optional[polars.lazyframe.frame.LazyFrame]:
352    def scan(self) -> Optional[pl.LazyFrame]:
353        iterable = iter(self.scan_partitions())
354        first_partition = next(iterable, None)
355        if first_partition is not None:
356            return pl.concat(chain([first_partition], iterable))
357        return None
def write(self, df: polars.dataframe.frame.DataFrame) -> None:
413    def write(self, df: pl.DataFrame) -> None:
414        self._check_partition_columns(df)
415        for partition_df, partition in self._partition_split(df):
416            partition.write(partition_df)
def append(self, df: polars.dataframe.frame.DataFrame) -> None:
418    def append(self, df: pl.DataFrame) -> None:
419        self._check_partition_columns(df)
420        for partition_df, partition in self._partition_split(df):
421            partition.append(partition_df)