polario.hive_dataset
The hive dataset implementation
1"""The hive dataset implementation""" 2 3from __future__ import annotations 4 5from collections import OrderedDict 6from collections.abc import Iterable 7from functools import reduce 8from itertools import chain 9from typing import Optional, Tuple, Type 10from urllib.parse import urlsplit 11from uuid import uuid4 12 13import fsspec 14import polars as pl 15from fsspec.spec import AbstractFileSystem 16 17DEFAULT_ROWS_PER_FRAGMENT = int(1e6) 18 19DEFAULT_PARQUET_WRITE_OPTIONS = { 20 "use_pyarrow": True, 21 "compression": "snappy", 22} 23 24 25def to_relative_location_from( 26 possible_prefix: str, base_location: str, location: str 27) -> str: 28 """Take a location and make it relative to the base location, stripping possible prefix from both""" 29 relative_location = location 30 if location.startswith(possible_prefix): 31 relative_location = relative_location[len(possible_prefix) :] 32 33 # If base location is not absolute, it might be somewhere in location 34 if not base_location.startswith("/") and base_location in relative_location: 35 relative_location = relative_location[relative_location.find(base_location) :] 36 37 relative_location = relative_location.lstrip("/") 38 scheme_less_url = base_location[len(possible_prefix) :].lstrip("/") 39 if relative_location.startswith(scheme_less_url): 40 relative_location = relative_location[len(scheme_less_url) + 1 :] 41 return relative_location 42 43 44class ParquetFragment: 45 """Pointer to a parquet fragment""" 46 47 def __init__(self, url: str, parquet_write_options: dict) -> None: 48 self.url = url 49 self.parquet_write_options = parquet_write_options 50 51 @classmethod 52 def first_fragment( 53 cls: Type[ParquetFragment], 54 partition_base_url: str, 55 parquet_write_options: dict, 56 ) -> ParquetFragment: 57 """Return a fragment name for what should be the first fragment in the partition""" 58 idx = 0 59 return cls( 60 f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet", 61 parquet_write_options, 62 ) 63 64 def next_fragment(self) -> ParquetFragment: 65 """Return a fragment name for what should be the next fragment in the partition""" 66 idx = int(self.url.split("/")[-1].split("_")[0]) + 1 67 return ParquetFragment( 68 f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet", 69 self.parquet_write_options, 70 ) 71 72 def read(self) -> pl.DataFrame: 73 """Read the fragment""" 74 return pl.read_parquet(self.url) 75 76 def scan(self) -> pl.LazyFrame: 77 """Read the fragment""" 78 return pl.scan_parquet(self.url) 79 80 def write(self, df: pl.DataFrame) -> None: 81 """Write the fragment""" 82 df.write_parquet(self.url, **self.parquet_write_options) 83 84 85class HivePartition: 86 """Pointer to a partition in a HiveDataset""" 87 88 def __init__( 89 self, 90 fs: AbstractFileSystem, 91 dataset_url: str, 92 partition_column_values: OrderedDict[str, str], 93 maximum_rows_per_fragment: int, 94 parquet_write_options: dict, 95 ) -> None: 96 self.fs = fs 97 self.partition_column_values = partition_column_values 98 self.maximum_rows_per_fragment = maximum_rows_per_fragment 99 self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/") 100 location = urlsplit(dataset_url) 101 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 102 self._parquet_write_options = parquet_write_options 103 104 @classmethod 105 def from_relative_path( 106 cls: Type[HivePartition], 107 fs: AbstractFileSystem, 108 dataset_url: str, 109 relative_path: str, 110 maximum_rows_per_fragment: int, 111 parquet_write_options: dict, 112 ) -> HivePartition: 113 """Create a partition from a relative path""" 114 relative_path_elements = relative_path.split("/") 115 if any(map(lambda x: "=" not in x, relative_path_elements)): 116 raise ValueError( 117 f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'" 118 ) 119 120 return cls( 121 fs=fs, 122 dataset_url=dataset_url, 123 partition_column_values=OrderedDict( 124 map(lambda x: x.split("=", 1), relative_path_elements) 125 ), 126 maximum_rows_per_fragment=maximum_rows_per_fragment, 127 parquet_write_options=parquet_write_options, 128 ) 129 130 def to_relative_path(self) -> str: 131 """Create a relative path from partition column values""" 132 return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()]) 133 134 def fragment_urls(self) -> Iterable[str]: 135 try: 136 return map( 137 lambda p: self.url 138 + "/" 139 + to_relative_location_from(self.scheme_prefix, self.url, p), 140 self.fs.expand_path("/".join([self.url, "*.parquet"])), 141 ) 142 except FileNotFoundError: 143 return [] 144 145 def fragments(self) -> Iterable[ParquetFragment]: 146 """Discover fragments""" 147 return map( 148 lambda fragment_url: ParquetFragment( 149 fragment_url, 150 self._parquet_write_options, 151 ), 152 self.fragment_urls(), 153 ) 154 155 def read(self) -> Optional[pl.DataFrame]: 156 """Concat the fragments in this partition into a single dataframe""" 157 fragments = [f.read() for f in self.fragments()] 158 if len(fragments) > 1: 159 # Merge schemas from different fragments into a superset schema 160 superset_schema: dict[str, pl.PolarsDataType] = reduce( 161 lambda a, b: a | dict(b.schema), 162 fragments[1:], 163 dict(fragments[0].schema), 164 ) 165 166 def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame: 167 missing_columns = superset_schema.keys() - set(df.columns) 168 if missing_columns: 169 return df.with_columns( 170 [ 171 pl.lit(None, superset_schema[col]).alias(col) 172 for col in missing_columns 173 ] 174 ) 175 else: 176 return df 177 178 complete_fragements = [ 179 add_missing_columns(f).select_seq(superset_schema.keys()) 180 for f in fragments 181 ] 182 else: 183 complete_fragements = fragments 184 185 if complete_fragements: 186 return pl.concat(complete_fragements).with_columns( 187 map( 188 lambda part: pl.lit(part[1]).alias(part[0]), 189 self.partition_column_values.items(), 190 ) 191 ) 192 return None 193 194 def scan(self) -> Optional[pl.LazyFrame]: 195 """Concat the fragments in this partition into a single dataframe""" 196 fragments = [f.scan() for f in self.fragments()] 197 if fragments: 198 return pl.concat(fragments).with_columns( 199 map( 200 lambda part: pl.lit(part[1]).alias(part[0]), 201 self.partition_column_values.items(), 202 ) 203 ) 204 return None 205 206 def _write_to_fragments( 207 self, df: pl.DataFrame, target_fragment: ParquetFragment 208 ) -> None: 209 output_columns = list( 210 sorted(set(df.columns) - self.partition_column_values.keys()) 211 ) 212 for fragment_df in df.select_seq(output_columns).iter_slices( 213 self.maximum_rows_per_fragment 214 ): 215 target_fragment.write(fragment_df) 216 target_fragment = target_fragment.next_fragment() 217 218 def delete(self) -> None: 219 """Delete the partition""" 220 if self.fs.exists(self.url): 221 self.fs.delete(self.url, recursive=True) 222 223 def write(self, df: pl.DataFrame) -> None: 224 """Write the dataframe to this partition""" 225 self.delete() 226 self.fs.mkdir(self.url) 227 target_fragment = ParquetFragment.first_fragment( 228 self.url, self._parquet_write_options 229 ) 230 self._write_to_fragments(df, target_fragment) 231 232 def append(self, df: pl.DataFrame) -> None: 233 """Write the dataframe to this partition""" 234 if not self.fs.exists(self.url): 235 self.fs.mkdir(self.url) 236 237 new_fragment = ParquetFragment.first_fragment( 238 self.url, self._parquet_write_options 239 ) 240 try: 241 *_, last_fragment = self.fragments() 242 new_fragment = last_fragment.next_fragment() 243 except ValueError: 244 pass 245 self._write_to_fragments(df, new_fragment) 246 247 248class HiveDataset: 249 """Handle to multiple partitions""" 250 251 def __init__( 252 self, 253 url: str, 254 partition_columns: list[str] | None = None, 255 max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT, 256 parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS, 257 ) -> None: 258 self.url = url.rstrip("/") 259 # Load fsspec filesystem from url scheme 260 location = urlsplit(url) 261 self.fs = fsspec.filesystem(location.scheme) 262 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 263 self.partition_columns = partition_columns or [] 264 self._max_rows_per_fragment = max_rows_per_fragment 265 self._parquet_write_options = parquet_write_options 266 267 def partitions(self) -> Iterable[HivePartition]: 268 """Iterate over HivePartitions""" 269 if self.partition_columns: 270 glob_pattern = HivePartition( 271 fs=self.fs, 272 dataset_url=self.url, 273 partition_column_values=OrderedDict( 274 {k: "*" for k in self.partition_columns} 275 ), 276 maximum_rows_per_fragment=self._max_rows_per_fragment, 277 parquet_write_options=self._parquet_write_options, 278 ).to_relative_path() 279 try: 280 partitions = self.fs.expand_path(self.url + "/" + glob_pattern) 281 282 return map( 283 lambda path: HivePartition.from_relative_path( 284 fs=self.fs, 285 dataset_url=self.url, 286 relative_path=to_relative_location_from( 287 self.scheme_prefix, self.url, path 288 ), 289 maximum_rows_per_fragment=self._max_rows_per_fragment, 290 parquet_write_options=self._parquet_write_options, 291 ), 292 sorted(partitions), 293 ) 294 except FileNotFoundError: 295 return [] 296 else: 297 return [ 298 HivePartition( 299 fs=self.fs, 300 dataset_url=self.url, 301 partition_column_values=OrderedDict(), 302 maximum_rows_per_fragment=self._max_rows_per_fragment, 303 parquet_write_options=self._parquet_write_options, 304 ) 305 ] 306 307 def read_partitions(self) -> Iterable[pl.DataFrame]: 308 """Iterate over partitions""" 309 for partition in self.partitions(): 310 df = partition.read() 311 if df is not None: 312 yield df 313 314 def read_partition( 315 self, partition_column_values: dict[str, str] 316 ) -> Optional[pl.DataFrame]: 317 """Read the given partition from the dataset""" 318 if set(partition_column_values.keys()) != set(self.partition_columns): 319 raise ValueError( 320 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 321 ) 322 return HivePartition( 323 fs=self.fs, 324 dataset_url=self.url, 325 partition_column_values=OrderedDict(partition_column_values), 326 maximum_rows_per_fragment=self._max_rows_per_fragment, 327 parquet_write_options=self._parquet_write_options, 328 ).read() 329 330 def delete_partition(self, partition_column_values: dict[str, str]) -> None: 331 """Read the given partition from the dataset""" 332 if set(partition_column_values.keys()) != set(self.partition_columns): 333 raise ValueError( 334 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 335 ) 336 return HivePartition( 337 fs=self.fs, 338 dataset_url=self.url, 339 partition_column_values=OrderedDict(partition_column_values), 340 maximum_rows_per_fragment=self._max_rows_per_fragment, 341 parquet_write_options=self._parquet_write_options, 342 ).delete() 343 344 def scan_partitions(self) -> Iterable[pl.LazyFrame]: 345 """Iterate over partitions""" 346 for partition in self.partitions(): 347 df = partition.scan() 348 if df is not None: 349 yield df 350 351 def scan(self) -> Optional[pl.LazyFrame]: 352 iterable = iter(self.scan_partitions()) 353 first_partition = next(iterable, None) 354 if first_partition is not None: 355 return pl.concat(chain([first_partition], iterable)) 356 return None 357 358 def _check_partition_columns(self, df: pl.DataFrame) -> None: 359 """Check if the given dataframe fits in this dataset""" 360 if not set(df.columns).issuperset(set(self.partition_columns)) or len( 361 df.columns 362 ) <= len(self.partition_columns): 363 raise ValueError( 364 f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}" 365 ) 366 for pcol in self.partition_columns: 367 if df[pcol].dtype != pl.Utf8: 368 raise ValueError( 369 f"Partition column {pcol} is not a string, but {df[pcol].dtype}" 370 ) 371 372 def _partition_split( 373 self, df: pl.DataFrame 374 ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]: 375 """Split dataframe into partitions and partition dataframes""" 376 self._check_partition_columns(df) 377 if self.partition_columns == []: 378 yield ( 379 df, 380 HivePartition( 381 fs=self.fs, 382 dataset_url=self.url, 383 partition_column_values=OrderedDict(), 384 maximum_rows_per_fragment=self._max_rows_per_fragment, 385 parquet_write_options=self._parquet_write_options, 386 ), 387 ) 388 else: 389 partition_values = df.select_seq(self.partition_columns).unique().to_dicts() 390 for partition_value in partition_values: 391 yield ( 392 df.filter( 393 reduce( 394 lambda a, b: a & b, 395 [pl.col(k) == v for k, v in partition_value.items()], 396 ) 397 ), 398 HivePartition( 399 fs=self.fs, 400 dataset_url=self.url, 401 partition_column_values=OrderedDict( 402 [ 403 (pcol, partition_value[pcol]) 404 for pcol in self.partition_columns 405 ] 406 ), 407 maximum_rows_per_fragment=self._max_rows_per_fragment, 408 parquet_write_options=self._parquet_write_options, 409 ), 410 ) 411 412 def write(self, df: pl.DataFrame) -> None: 413 self._check_partition_columns(df) 414 for partition_df, partition in self._partition_split(df): 415 partition.write(partition_df) 416 417 def append(self, df: pl.DataFrame) -> None: 418 self._check_partition_columns(df) 419 for partition_df, partition in self._partition_split(df): 420 partition.append(partition_df)
26def to_relative_location_from( 27 possible_prefix: str, base_location: str, location: str 28) -> str: 29 """Take a location and make it relative to the base location, stripping possible prefix from both""" 30 relative_location = location 31 if location.startswith(possible_prefix): 32 relative_location = relative_location[len(possible_prefix) :] 33 34 # If base location is not absolute, it might be somewhere in location 35 if not base_location.startswith("/") and base_location in relative_location: 36 relative_location = relative_location[relative_location.find(base_location) :] 37 38 relative_location = relative_location.lstrip("/") 39 scheme_less_url = base_location[len(possible_prefix) :].lstrip("/") 40 if relative_location.startswith(scheme_less_url): 41 relative_location = relative_location[len(scheme_less_url) + 1 :] 42 return relative_location
Take a location and make it relative to the base location, stripping possible prefix from both
45class ParquetFragment: 46 """Pointer to a parquet fragment""" 47 48 def __init__(self, url: str, parquet_write_options: dict) -> None: 49 self.url = url 50 self.parquet_write_options = parquet_write_options 51 52 @classmethod 53 def first_fragment( 54 cls: Type[ParquetFragment], 55 partition_base_url: str, 56 parquet_write_options: dict, 57 ) -> ParquetFragment: 58 """Return a fragment name for what should be the first fragment in the partition""" 59 idx = 0 60 return cls( 61 f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet", 62 parquet_write_options, 63 ) 64 65 def next_fragment(self) -> ParquetFragment: 66 """Return a fragment name for what should be the next fragment in the partition""" 67 idx = int(self.url.split("/")[-1].split("_")[0]) + 1 68 return ParquetFragment( 69 f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet", 70 self.parquet_write_options, 71 ) 72 73 def read(self) -> pl.DataFrame: 74 """Read the fragment""" 75 return pl.read_parquet(self.url) 76 77 def scan(self) -> pl.LazyFrame: 78 """Read the fragment""" 79 return pl.scan_parquet(self.url) 80 81 def write(self, df: pl.DataFrame) -> None: 82 """Write the fragment""" 83 df.write_parquet(self.url, **self.parquet_write_options)
Pointer to a parquet fragment
52 @classmethod 53 def first_fragment( 54 cls: Type[ParquetFragment], 55 partition_base_url: str, 56 parquet_write_options: dict, 57 ) -> ParquetFragment: 58 """Return a fragment name for what should be the first fragment in the partition""" 59 idx = 0 60 return cls( 61 f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet", 62 parquet_write_options, 63 )
Return a fragment name for what should be the first fragment in the partition
65 def next_fragment(self) -> ParquetFragment: 66 """Return a fragment name for what should be the next fragment in the partition""" 67 idx = int(self.url.split("/")[-1].split("_")[0]) + 1 68 return ParquetFragment( 69 f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet", 70 self.parquet_write_options, 71 )
Return a fragment name for what should be the next fragment in the partition
86class HivePartition: 87 """Pointer to a partition in a HiveDataset""" 88 89 def __init__( 90 self, 91 fs: AbstractFileSystem, 92 dataset_url: str, 93 partition_column_values: OrderedDict[str, str], 94 maximum_rows_per_fragment: int, 95 parquet_write_options: dict, 96 ) -> None: 97 self.fs = fs 98 self.partition_column_values = partition_column_values 99 self.maximum_rows_per_fragment = maximum_rows_per_fragment 100 self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/") 101 location = urlsplit(dataset_url) 102 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 103 self._parquet_write_options = parquet_write_options 104 105 @classmethod 106 def from_relative_path( 107 cls: Type[HivePartition], 108 fs: AbstractFileSystem, 109 dataset_url: str, 110 relative_path: str, 111 maximum_rows_per_fragment: int, 112 parquet_write_options: dict, 113 ) -> HivePartition: 114 """Create a partition from a relative path""" 115 relative_path_elements = relative_path.split("/") 116 if any(map(lambda x: "=" not in x, relative_path_elements)): 117 raise ValueError( 118 f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'" 119 ) 120 121 return cls( 122 fs=fs, 123 dataset_url=dataset_url, 124 partition_column_values=OrderedDict( 125 map(lambda x: x.split("=", 1), relative_path_elements) 126 ), 127 maximum_rows_per_fragment=maximum_rows_per_fragment, 128 parquet_write_options=parquet_write_options, 129 ) 130 131 def to_relative_path(self) -> str: 132 """Create a relative path from partition column values""" 133 return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()]) 134 135 def fragment_urls(self) -> Iterable[str]: 136 try: 137 return map( 138 lambda p: self.url 139 + "/" 140 + to_relative_location_from(self.scheme_prefix, self.url, p), 141 self.fs.expand_path("/".join([self.url, "*.parquet"])), 142 ) 143 except FileNotFoundError: 144 return [] 145 146 def fragments(self) -> Iterable[ParquetFragment]: 147 """Discover fragments""" 148 return map( 149 lambda fragment_url: ParquetFragment( 150 fragment_url, 151 self._parquet_write_options, 152 ), 153 self.fragment_urls(), 154 ) 155 156 def read(self) -> Optional[pl.DataFrame]: 157 """Concat the fragments in this partition into a single dataframe""" 158 fragments = [f.read() for f in self.fragments()] 159 if len(fragments) > 1: 160 # Merge schemas from different fragments into a superset schema 161 superset_schema: dict[str, pl.PolarsDataType] = reduce( 162 lambda a, b: a | dict(b.schema), 163 fragments[1:], 164 dict(fragments[0].schema), 165 ) 166 167 def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame: 168 missing_columns = superset_schema.keys() - set(df.columns) 169 if missing_columns: 170 return df.with_columns( 171 [ 172 pl.lit(None, superset_schema[col]).alias(col) 173 for col in missing_columns 174 ] 175 ) 176 else: 177 return df 178 179 complete_fragements = [ 180 add_missing_columns(f).select_seq(superset_schema.keys()) 181 for f in fragments 182 ] 183 else: 184 complete_fragements = fragments 185 186 if complete_fragements: 187 return pl.concat(complete_fragements).with_columns( 188 map( 189 lambda part: pl.lit(part[1]).alias(part[0]), 190 self.partition_column_values.items(), 191 ) 192 ) 193 return None 194 195 def scan(self) -> Optional[pl.LazyFrame]: 196 """Concat the fragments in this partition into a single dataframe""" 197 fragments = [f.scan() for f in self.fragments()] 198 if fragments: 199 return pl.concat(fragments).with_columns( 200 map( 201 lambda part: pl.lit(part[1]).alias(part[0]), 202 self.partition_column_values.items(), 203 ) 204 ) 205 return None 206 207 def _write_to_fragments( 208 self, df: pl.DataFrame, target_fragment: ParquetFragment 209 ) -> None: 210 output_columns = list( 211 sorted(set(df.columns) - self.partition_column_values.keys()) 212 ) 213 for fragment_df in df.select_seq(output_columns).iter_slices( 214 self.maximum_rows_per_fragment 215 ): 216 target_fragment.write(fragment_df) 217 target_fragment = target_fragment.next_fragment() 218 219 def delete(self) -> None: 220 """Delete the partition""" 221 if self.fs.exists(self.url): 222 self.fs.delete(self.url, recursive=True) 223 224 def write(self, df: pl.DataFrame) -> None: 225 """Write the dataframe to this partition""" 226 self.delete() 227 self.fs.mkdir(self.url) 228 target_fragment = ParquetFragment.first_fragment( 229 self.url, self._parquet_write_options 230 ) 231 self._write_to_fragments(df, target_fragment) 232 233 def append(self, df: pl.DataFrame) -> None: 234 """Write the dataframe to this partition""" 235 if not self.fs.exists(self.url): 236 self.fs.mkdir(self.url) 237 238 new_fragment = ParquetFragment.first_fragment( 239 self.url, self._parquet_write_options 240 ) 241 try: 242 *_, last_fragment = self.fragments() 243 new_fragment = last_fragment.next_fragment() 244 except ValueError: 245 pass 246 self._write_to_fragments(df, new_fragment)
Pointer to a partition in a HiveDataset
89 def __init__( 90 self, 91 fs: AbstractFileSystem, 92 dataset_url: str, 93 partition_column_values: OrderedDict[str, str], 94 maximum_rows_per_fragment: int, 95 parquet_write_options: dict, 96 ) -> None: 97 self.fs = fs 98 self.partition_column_values = partition_column_values 99 self.maximum_rows_per_fragment = maximum_rows_per_fragment 100 self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/") 101 location = urlsplit(dataset_url) 102 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 103 self._parquet_write_options = parquet_write_options
105 @classmethod 106 def from_relative_path( 107 cls: Type[HivePartition], 108 fs: AbstractFileSystem, 109 dataset_url: str, 110 relative_path: str, 111 maximum_rows_per_fragment: int, 112 parquet_write_options: dict, 113 ) -> HivePartition: 114 """Create a partition from a relative path""" 115 relative_path_elements = relative_path.split("/") 116 if any(map(lambda x: "=" not in x, relative_path_elements)): 117 raise ValueError( 118 f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'" 119 ) 120 121 return cls( 122 fs=fs, 123 dataset_url=dataset_url, 124 partition_column_values=OrderedDict( 125 map(lambda x: x.split("=", 1), relative_path_elements) 126 ), 127 maximum_rows_per_fragment=maximum_rows_per_fragment, 128 parquet_write_options=parquet_write_options, 129 )
Create a partition from a relative path
131 def to_relative_path(self) -> str: 132 """Create a relative path from partition column values""" 133 return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])
Create a relative path from partition column values
146 def fragments(self) -> Iterable[ParquetFragment]: 147 """Discover fragments""" 148 return map( 149 lambda fragment_url: ParquetFragment( 150 fragment_url, 151 self._parquet_write_options, 152 ), 153 self.fragment_urls(), 154 )
Discover fragments
156 def read(self) -> Optional[pl.DataFrame]: 157 """Concat the fragments in this partition into a single dataframe""" 158 fragments = [f.read() for f in self.fragments()] 159 if len(fragments) > 1: 160 # Merge schemas from different fragments into a superset schema 161 superset_schema: dict[str, pl.PolarsDataType] = reduce( 162 lambda a, b: a | dict(b.schema), 163 fragments[1:], 164 dict(fragments[0].schema), 165 ) 166 167 def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame: 168 missing_columns = superset_schema.keys() - set(df.columns) 169 if missing_columns: 170 return df.with_columns( 171 [ 172 pl.lit(None, superset_schema[col]).alias(col) 173 for col in missing_columns 174 ] 175 ) 176 else: 177 return df 178 179 complete_fragements = [ 180 add_missing_columns(f).select_seq(superset_schema.keys()) 181 for f in fragments 182 ] 183 else: 184 complete_fragements = fragments 185 186 if complete_fragements: 187 return pl.concat(complete_fragements).with_columns( 188 map( 189 lambda part: pl.lit(part[1]).alias(part[0]), 190 self.partition_column_values.items(), 191 ) 192 ) 193 return None
Concat the fragments in this partition into a single dataframe
195 def scan(self) -> Optional[pl.LazyFrame]: 196 """Concat the fragments in this partition into a single dataframe""" 197 fragments = [f.scan() for f in self.fragments()] 198 if fragments: 199 return pl.concat(fragments).with_columns( 200 map( 201 lambda part: pl.lit(part[1]).alias(part[0]), 202 self.partition_column_values.items(), 203 ) 204 ) 205 return None
Concat the fragments in this partition into a single dataframe
219 def delete(self) -> None: 220 """Delete the partition""" 221 if self.fs.exists(self.url): 222 self.fs.delete(self.url, recursive=True)
Delete the partition
224 def write(self, df: pl.DataFrame) -> None: 225 """Write the dataframe to this partition""" 226 self.delete() 227 self.fs.mkdir(self.url) 228 target_fragment = ParquetFragment.first_fragment( 229 self.url, self._parquet_write_options 230 ) 231 self._write_to_fragments(df, target_fragment)
Write the dataframe to this partition
233 def append(self, df: pl.DataFrame) -> None: 234 """Write the dataframe to this partition""" 235 if not self.fs.exists(self.url): 236 self.fs.mkdir(self.url) 237 238 new_fragment = ParquetFragment.first_fragment( 239 self.url, self._parquet_write_options 240 ) 241 try: 242 *_, last_fragment = self.fragments() 243 new_fragment = last_fragment.next_fragment() 244 except ValueError: 245 pass 246 self._write_to_fragments(df, new_fragment)
Write the dataframe to this partition
249class HiveDataset: 250 """Handle to multiple partitions""" 251 252 def __init__( 253 self, 254 url: str, 255 partition_columns: list[str] | None = None, 256 max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT, 257 parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS, 258 ) -> None: 259 self.url = url.rstrip("/") 260 # Load fsspec filesystem from url scheme 261 location = urlsplit(url) 262 self.fs = fsspec.filesystem(location.scheme) 263 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 264 self.partition_columns = partition_columns or [] 265 self._max_rows_per_fragment = max_rows_per_fragment 266 self._parquet_write_options = parquet_write_options 267 268 def partitions(self) -> Iterable[HivePartition]: 269 """Iterate over HivePartitions""" 270 if self.partition_columns: 271 glob_pattern = HivePartition( 272 fs=self.fs, 273 dataset_url=self.url, 274 partition_column_values=OrderedDict( 275 {k: "*" for k in self.partition_columns} 276 ), 277 maximum_rows_per_fragment=self._max_rows_per_fragment, 278 parquet_write_options=self._parquet_write_options, 279 ).to_relative_path() 280 try: 281 partitions = self.fs.expand_path(self.url + "/" + glob_pattern) 282 283 return map( 284 lambda path: HivePartition.from_relative_path( 285 fs=self.fs, 286 dataset_url=self.url, 287 relative_path=to_relative_location_from( 288 self.scheme_prefix, self.url, path 289 ), 290 maximum_rows_per_fragment=self._max_rows_per_fragment, 291 parquet_write_options=self._parquet_write_options, 292 ), 293 sorted(partitions), 294 ) 295 except FileNotFoundError: 296 return [] 297 else: 298 return [ 299 HivePartition( 300 fs=self.fs, 301 dataset_url=self.url, 302 partition_column_values=OrderedDict(), 303 maximum_rows_per_fragment=self._max_rows_per_fragment, 304 parquet_write_options=self._parquet_write_options, 305 ) 306 ] 307 308 def read_partitions(self) -> Iterable[pl.DataFrame]: 309 """Iterate over partitions""" 310 for partition in self.partitions(): 311 df = partition.read() 312 if df is not None: 313 yield df 314 315 def read_partition( 316 self, partition_column_values: dict[str, str] 317 ) -> Optional[pl.DataFrame]: 318 """Read the given partition from the dataset""" 319 if set(partition_column_values.keys()) != set(self.partition_columns): 320 raise ValueError( 321 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 322 ) 323 return HivePartition( 324 fs=self.fs, 325 dataset_url=self.url, 326 partition_column_values=OrderedDict(partition_column_values), 327 maximum_rows_per_fragment=self._max_rows_per_fragment, 328 parquet_write_options=self._parquet_write_options, 329 ).read() 330 331 def delete_partition(self, partition_column_values: dict[str, str]) -> None: 332 """Read the given partition from the dataset""" 333 if set(partition_column_values.keys()) != set(self.partition_columns): 334 raise ValueError( 335 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 336 ) 337 return HivePartition( 338 fs=self.fs, 339 dataset_url=self.url, 340 partition_column_values=OrderedDict(partition_column_values), 341 maximum_rows_per_fragment=self._max_rows_per_fragment, 342 parquet_write_options=self._parquet_write_options, 343 ).delete() 344 345 def scan_partitions(self) -> Iterable[pl.LazyFrame]: 346 """Iterate over partitions""" 347 for partition in self.partitions(): 348 df = partition.scan() 349 if df is not None: 350 yield df 351 352 def scan(self) -> Optional[pl.LazyFrame]: 353 iterable = iter(self.scan_partitions()) 354 first_partition = next(iterable, None) 355 if first_partition is not None: 356 return pl.concat(chain([first_partition], iterable)) 357 return None 358 359 def _check_partition_columns(self, df: pl.DataFrame) -> None: 360 """Check if the given dataframe fits in this dataset""" 361 if not set(df.columns).issuperset(set(self.partition_columns)) or len( 362 df.columns 363 ) <= len(self.partition_columns): 364 raise ValueError( 365 f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}" 366 ) 367 for pcol in self.partition_columns: 368 if df[pcol].dtype != pl.Utf8: 369 raise ValueError( 370 f"Partition column {pcol} is not a string, but {df[pcol].dtype}" 371 ) 372 373 def _partition_split( 374 self, df: pl.DataFrame 375 ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]: 376 """Split dataframe into partitions and partition dataframes""" 377 self._check_partition_columns(df) 378 if self.partition_columns == []: 379 yield ( 380 df, 381 HivePartition( 382 fs=self.fs, 383 dataset_url=self.url, 384 partition_column_values=OrderedDict(), 385 maximum_rows_per_fragment=self._max_rows_per_fragment, 386 parquet_write_options=self._parquet_write_options, 387 ), 388 ) 389 else: 390 partition_values = df.select_seq(self.partition_columns).unique().to_dicts() 391 for partition_value in partition_values: 392 yield ( 393 df.filter( 394 reduce( 395 lambda a, b: a & b, 396 [pl.col(k) == v for k, v in partition_value.items()], 397 ) 398 ), 399 HivePartition( 400 fs=self.fs, 401 dataset_url=self.url, 402 partition_column_values=OrderedDict( 403 [ 404 (pcol, partition_value[pcol]) 405 for pcol in self.partition_columns 406 ] 407 ), 408 maximum_rows_per_fragment=self._max_rows_per_fragment, 409 parquet_write_options=self._parquet_write_options, 410 ), 411 ) 412 413 def write(self, df: pl.DataFrame) -> None: 414 self._check_partition_columns(df) 415 for partition_df, partition in self._partition_split(df): 416 partition.write(partition_df) 417 418 def append(self, df: pl.DataFrame) -> None: 419 self._check_partition_columns(df) 420 for partition_df, partition in self._partition_split(df): 421 partition.append(partition_df)
Handle to multiple partitions
252 def __init__( 253 self, 254 url: str, 255 partition_columns: list[str] | None = None, 256 max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT, 257 parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS, 258 ) -> None: 259 self.url = url.rstrip("/") 260 # Load fsspec filesystem from url scheme 261 location = urlsplit(url) 262 self.fs = fsspec.filesystem(location.scheme) 263 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 264 self.partition_columns = partition_columns or [] 265 self._max_rows_per_fragment = max_rows_per_fragment 266 self._parquet_write_options = parquet_write_options
268 def partitions(self) -> Iterable[HivePartition]: 269 """Iterate over HivePartitions""" 270 if self.partition_columns: 271 glob_pattern = HivePartition( 272 fs=self.fs, 273 dataset_url=self.url, 274 partition_column_values=OrderedDict( 275 {k: "*" for k in self.partition_columns} 276 ), 277 maximum_rows_per_fragment=self._max_rows_per_fragment, 278 parquet_write_options=self._parquet_write_options, 279 ).to_relative_path() 280 try: 281 partitions = self.fs.expand_path(self.url + "/" + glob_pattern) 282 283 return map( 284 lambda path: HivePartition.from_relative_path( 285 fs=self.fs, 286 dataset_url=self.url, 287 relative_path=to_relative_location_from( 288 self.scheme_prefix, self.url, path 289 ), 290 maximum_rows_per_fragment=self._max_rows_per_fragment, 291 parquet_write_options=self._parquet_write_options, 292 ), 293 sorted(partitions), 294 ) 295 except FileNotFoundError: 296 return [] 297 else: 298 return [ 299 HivePartition( 300 fs=self.fs, 301 dataset_url=self.url, 302 partition_column_values=OrderedDict(), 303 maximum_rows_per_fragment=self._max_rows_per_fragment, 304 parquet_write_options=self._parquet_write_options, 305 ) 306 ]
Iterate over HivePartitions
308 def read_partitions(self) -> Iterable[pl.DataFrame]: 309 """Iterate over partitions""" 310 for partition in self.partitions(): 311 df = partition.read() 312 if df is not None: 313 yield df
Iterate over partitions
315 def read_partition( 316 self, partition_column_values: dict[str, str] 317 ) -> Optional[pl.DataFrame]: 318 """Read the given partition from the dataset""" 319 if set(partition_column_values.keys()) != set(self.partition_columns): 320 raise ValueError( 321 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 322 ) 323 return HivePartition( 324 fs=self.fs, 325 dataset_url=self.url, 326 partition_column_values=OrderedDict(partition_column_values), 327 maximum_rows_per_fragment=self._max_rows_per_fragment, 328 parquet_write_options=self._parquet_write_options, 329 ).read()
Read the given partition from the dataset
331 def delete_partition(self, partition_column_values: dict[str, str]) -> None: 332 """Read the given partition from the dataset""" 333 if set(partition_column_values.keys()) != set(self.partition_columns): 334 raise ValueError( 335 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 336 ) 337 return HivePartition( 338 fs=self.fs, 339 dataset_url=self.url, 340 partition_column_values=OrderedDict(partition_column_values), 341 maximum_rows_per_fragment=self._max_rows_per_fragment, 342 parquet_write_options=self._parquet_write_options, 343 ).delete()
Read the given partition from the dataset
345 def scan_partitions(self) -> Iterable[pl.LazyFrame]: 346 """Iterate over partitions""" 347 for partition in self.partitions(): 348 df = partition.scan() 349 if df is not None: 350 yield df
Iterate over partitions