polario.hive_dataset
The hive dataset implementation
1"""The hive dataset implementation""" 2from collections import OrderedDict 3from functools import reduce 4from itertools import chain 5from typing import Iterable, Optional, Tuple, Type 6from urllib.parse import urlsplit 7from uuid import uuid4 8 9import fsspec 10import polars as pl 11from fsspec.spec import AbstractFileSystem 12 13DEFAULT_ROWS_PER_FRAGMENT = int(1e6) 14 15DEFAULT_PARQUET_WRITE_OPTIONS = { 16 "use_pyarrow": True, 17 "compression": "snappy", 18} 19 20 21def to_relative_location_from( 22 possible_prefix: str, base_location: str, location: str 23) -> str: 24 """Take a location and make it relative to the base location, stripping possible prefix from both""" 25 relative_location = location 26 if location.startswith(possible_prefix): 27 relative_location = relative_location[len(possible_prefix) :] 28 29 # If base location is not absolute, it might be somewhere in location 30 if not base_location.startswith("/"): 31 if base_location in relative_location: 32 relative_location = relative_location[ 33 relative_location.find(base_location) : 34 ] 35 36 relative_location = relative_location.lstrip("/") 37 scheme_less_url = base_location[len(possible_prefix) :].lstrip("/") 38 if relative_location.startswith(scheme_less_url): 39 relative_location = relative_location[len(scheme_less_url) + 1 :] 40 return relative_location 41 42 43class ParquetFragment: 44 """Pointer to a parquet fragment""" 45 46 def __init__(self, url: str, parquet_write_options: dict): 47 self.url = url 48 self.parquet_write_options = parquet_write_options 49 50 @classmethod 51 def first_fragment( 52 cls: Type["ParquetFragment"], 53 partition_base_url: str, 54 parquet_write_options: dict, 55 ) -> "ParquetFragment": 56 """Return a fragment name for what should be the first fragment in the partition""" 57 idx = 0 58 return cls( 59 f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet", 60 parquet_write_options, 61 ) 62 63 def next_fragment(self) -> "ParquetFragment": 64 """Return a fragment name for what should be the next fragment in the partition""" 65 idx = int(self.url.split("/")[-1].split("_")[0]) + 1 66 return ParquetFragment( 67 f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet", 68 self.parquet_write_options, 69 ) 70 71 def read(self) -> pl.DataFrame: 72 """Read the fragment""" 73 return pl.read_parquet(self.url) 74 75 def scan(self) -> pl.LazyFrame: 76 """Read the fragment""" 77 return pl.scan_parquet(self.url) 78 79 def write(self, df: pl.DataFrame) -> None: 80 """Write the fragment""" 81 df.write_parquet(self.url, **self.parquet_write_options) 82 83 84class HivePartition: 85 """Pointer to a partition in a HiveDataset""" 86 87 def __init__( 88 self, 89 fs: AbstractFileSystem, 90 dataset_url: str, 91 partition_column_values: OrderedDict[str, str], 92 maximum_rows_per_fragment: int, 93 parquet_write_options: dict, 94 ) -> None: 95 self.fs = fs 96 self.partition_column_values = partition_column_values 97 self.maximum_rows_per_fragment = maximum_rows_per_fragment 98 self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/") 99 location = urlsplit(dataset_url) 100 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 101 self._parquet_write_options = parquet_write_options 102 103 @classmethod 104 def from_relative_path( 105 cls: Type["HivePartition"], 106 fs: AbstractFileSystem, 107 dataset_url: str, 108 relative_path: str, 109 maximum_rows_per_fragment: int, 110 parquet_write_options: dict, 111 ) -> "HivePartition": 112 """Create a partition from a relative path""" 113 relative_path_elements = relative_path.split("/") 114 if any(map(lambda x: "=" not in x, relative_path_elements)): 115 raise ValueError( 116 f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'" 117 ) 118 119 return cls( 120 fs=fs, 121 dataset_url=dataset_url, 122 partition_column_values=OrderedDict( 123 map(lambda x: x.split("=", 1), relative_path_elements) 124 ), 125 maximum_rows_per_fragment=maximum_rows_per_fragment, 126 parquet_write_options=parquet_write_options, 127 ) 128 129 def to_relative_path(self) -> str: 130 """Create a relative path from partition column values""" 131 return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()]) 132 133 def fragment_urls(self) -> Iterable[str]: 134 try: 135 return map( 136 lambda p: self.url 137 + "/" 138 + to_relative_location_from(self.scheme_prefix, self.url, p), 139 self.fs.expand_path("/".join([self.url, "*.parquet"])), 140 ) 141 except FileNotFoundError: 142 return [] 143 144 def fragments(self) -> Iterable[ParquetFragment]: 145 """Discover fragments""" 146 return map( 147 lambda fragment_url: ParquetFragment( 148 fragment_url, 149 self._parquet_write_options, 150 ), 151 self.fragment_urls(), 152 ) 153 154 def read(self) -> Optional[pl.DataFrame]: 155 """Concat the fragments in this partition into a single dataframe""" 156 fragments = [f.read() for f in self.fragments()] 157 if len(fragments) > 1: 158 # Merge schemas from different fragments into a superset schema 159 superset_schema: dict[str, pl.PolarsDataType] = reduce( 160 lambda a, b: a | dict(b.schema), 161 fragments[1:], 162 dict(fragments[0].schema), 163 ) 164 165 def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame: 166 missing_columns = superset_schema.keys() - set(df.columns) 167 if missing_columns: 168 return df.with_columns( 169 [ 170 pl.lit(None, superset_schema[col]).alias(col) 171 for col in missing_columns 172 ] 173 ) 174 else: 175 return df 176 177 complete_fragements = [ 178 add_missing_columns(f).select(superset_schema.keys()) for f in fragments 179 ] 180 else: 181 complete_fragements = fragments 182 183 if complete_fragements: 184 return pl.concat(complete_fragements).with_columns( 185 map( 186 lambda part: pl.lit(part[1]).alias(part[0]), 187 self.partition_column_values.items(), 188 ) 189 ) 190 return None 191 192 def scan(self) -> Optional[pl.LazyFrame]: 193 """Concat the fragments in this partition into a single dataframe""" 194 fragments = [f.scan() for f in self.fragments()] 195 if fragments: 196 return pl.concat(fragments).with_columns( 197 map( 198 lambda part: pl.lit(part[1]).alias(part[0]), 199 self.partition_column_values.items(), 200 ) 201 ) 202 return None 203 204 def _write_to_fragments( 205 self, df: pl.DataFrame, target_fragment: ParquetFragment 206 ) -> None: 207 output_columns = list( 208 sorted(set(df.columns) - self.partition_column_values.keys()) 209 ) 210 for fragment_df in df.select(output_columns).iter_slices( 211 self.maximum_rows_per_fragment 212 ): 213 target_fragment.write(fragment_df) 214 target_fragment = target_fragment.next_fragment() 215 216 def delete(self) -> None: 217 """Delete the partition""" 218 if self.fs.exists(self.url): 219 self.fs.delete(self.url, recursive=True) 220 221 def write(self, df: pl.DataFrame) -> None: 222 """Write the dataframe to this partition""" 223 self.delete() 224 self.fs.mkdir(self.url) 225 target_fragment = ParquetFragment.first_fragment( 226 self.url, self._parquet_write_options 227 ) 228 self._write_to_fragments(df, target_fragment) 229 230 def append(self, df: pl.DataFrame) -> None: 231 """Write the dataframe to this partition""" 232 if not self.fs.exists(self.url): 233 self.fs.mkdir(self.url) 234 235 new_fragment = ParquetFragment.first_fragment( 236 self.url, self._parquet_write_options 237 ) 238 try: 239 *_, last_fragment = self.fragments() 240 new_fragment = last_fragment.next_fragment() 241 except ValueError: 242 pass 243 self._write_to_fragments(df, new_fragment) 244 245 246class HiveDataset: 247 """Handle to multiple partitions""" 248 249 def __init__( 250 self, 251 url: str, 252 partition_columns: list[str] = [], 253 max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT, 254 parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS, 255 ) -> None: 256 self.url = url.rstrip("/") 257 # Load fsspec filesystem from url scheme 258 location = urlsplit(url) 259 self.fs = fsspec.filesystem(location.scheme) 260 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 261 self.partition_columns = partition_columns 262 self._max_rows_per_fragment = max_rows_per_fragment 263 self._parquet_write_options = parquet_write_options 264 265 def partitions(self) -> Iterable[HivePartition]: 266 """Iterate over HivePartitions""" 267 if self.partition_columns: 268 glob_pattern = HivePartition( 269 fs=self.fs, 270 dataset_url=self.url, 271 partition_column_values=OrderedDict( 272 {k: "*" for k in self.partition_columns} 273 ), 274 maximum_rows_per_fragment=self._max_rows_per_fragment, 275 parquet_write_options=self._parquet_write_options, 276 ).to_relative_path() 277 try: 278 partitions = self.fs.expand_path(self.url + "/" + glob_pattern) 279 280 return map( 281 lambda path: HivePartition.from_relative_path( 282 fs=self.fs, 283 dataset_url=self.url, 284 relative_path=to_relative_location_from( 285 self.scheme_prefix, self.url, path 286 ), 287 maximum_rows_per_fragment=self._max_rows_per_fragment, 288 parquet_write_options=self._parquet_write_options, 289 ), 290 sorted(partitions), 291 ) 292 except FileNotFoundError: 293 return [] 294 else: 295 return [ 296 HivePartition( 297 fs=self.fs, 298 dataset_url=self.url, 299 partition_column_values=OrderedDict(), 300 maximum_rows_per_fragment=self._max_rows_per_fragment, 301 parquet_write_options=self._parquet_write_options, 302 ) 303 ] 304 305 def read_partitions(self) -> Iterable[pl.DataFrame]: 306 """Iterate over partitions""" 307 for partition in self.partitions(): 308 df = partition.read() 309 if df is not None: 310 yield df 311 312 def read_partition( 313 self, partition_column_values: dict[str, str] 314 ) -> Optional[pl.DataFrame]: 315 """Read the given partition from the dataset""" 316 if set(partition_column_values.keys()) != set(self.partition_columns): 317 raise ValueError( 318 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 319 ) 320 return HivePartition( 321 fs=self.fs, 322 dataset_url=self.url, 323 partition_column_values=OrderedDict(partition_column_values), 324 maximum_rows_per_fragment=self._max_rows_per_fragment, 325 parquet_write_options=self._parquet_write_options, 326 ).read() 327 328 def delete_partition(self, partition_column_values: dict[str, str]) -> None: 329 """Read the given partition from the dataset""" 330 if set(partition_column_values.keys()) != set(self.partition_columns): 331 raise ValueError( 332 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 333 ) 334 return HivePartition( 335 fs=self.fs, 336 dataset_url=self.url, 337 partition_column_values=OrderedDict(partition_column_values), 338 maximum_rows_per_fragment=self._max_rows_per_fragment, 339 parquet_write_options=self._parquet_write_options, 340 ).delete() 341 342 def scan_partitions(self) -> Iterable[pl.LazyFrame]: 343 """Iterate over partitions""" 344 for partition in self.partitions(): 345 df = partition.scan() 346 if df is not None: 347 yield df 348 349 def scan(self) -> Optional[pl.LazyFrame]: 350 iterable = iter(self.scan_partitions()) 351 first_partition = next(iterable, None) 352 if first_partition is not None: 353 return pl.concat(chain([first_partition], iterable)) 354 return None 355 356 def _check_partition_columns(self, df: pl.DataFrame) -> None: 357 """Check if the given dataframe fits in this dataset""" 358 if not set(df.columns).issuperset(set(self.partition_columns)) or len( 359 df.columns 360 ) <= len(self.partition_columns): 361 raise ValueError( 362 f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}" 363 ) 364 for pcol in self.partition_columns: 365 if df[pcol].dtype != pl.Utf8: 366 raise ValueError( 367 f"Partition column {pcol} is not a string, but {df[pcol].dtype}" 368 ) 369 370 def _partition_split( 371 self, df: pl.DataFrame 372 ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]: 373 """Split dataframe into partitions and partition dataframes""" 374 self._check_partition_columns(df) 375 if self.partition_columns == []: 376 yield df, HivePartition( 377 fs=self.fs, 378 dataset_url=self.url, 379 partition_column_values=OrderedDict(), 380 maximum_rows_per_fragment=self._max_rows_per_fragment, 381 parquet_write_options=self._parquet_write_options, 382 ) 383 else: 384 partition_values = df.select(self.partition_columns).unique().to_dicts() 385 for partition_value in partition_values: 386 yield df.filter( 387 reduce( 388 lambda a, b: a & b, 389 [pl.col(k) == v for k, v in partition_value.items()], 390 ) 391 ), HivePartition( 392 fs=self.fs, 393 dataset_url=self.url, 394 partition_column_values=OrderedDict( 395 [ 396 (pcol, partition_value[pcol]) 397 for pcol in self.partition_columns 398 ] 399 ), 400 maximum_rows_per_fragment=self._max_rows_per_fragment, 401 parquet_write_options=self._parquet_write_options, 402 ) 403 404 def write(self, df: pl.DataFrame) -> None: 405 self._check_partition_columns(df) 406 for partition_df, partition in self._partition_split(df): 407 partition.write(partition_df) 408 409 def append(self, df: pl.DataFrame) -> None: 410 self._check_partition_columns(df) 411 for partition_df, partition in self._partition_split(df): 412 partition.append(partition_df)
22def to_relative_location_from( 23 possible_prefix: str, base_location: str, location: str 24) -> str: 25 """Take a location and make it relative to the base location, stripping possible prefix from both""" 26 relative_location = location 27 if location.startswith(possible_prefix): 28 relative_location = relative_location[len(possible_prefix) :] 29 30 # If base location is not absolute, it might be somewhere in location 31 if not base_location.startswith("/"): 32 if base_location in relative_location: 33 relative_location = relative_location[ 34 relative_location.find(base_location) : 35 ] 36 37 relative_location = relative_location.lstrip("/") 38 scheme_less_url = base_location[len(possible_prefix) :].lstrip("/") 39 if relative_location.startswith(scheme_less_url): 40 relative_location = relative_location[len(scheme_less_url) + 1 :] 41 return relative_location
Take a location and make it relative to the base location, stripping possible prefix from both
44class ParquetFragment: 45 """Pointer to a parquet fragment""" 46 47 def __init__(self, url: str, parquet_write_options: dict): 48 self.url = url 49 self.parquet_write_options = parquet_write_options 50 51 @classmethod 52 def first_fragment( 53 cls: Type["ParquetFragment"], 54 partition_base_url: str, 55 parquet_write_options: dict, 56 ) -> "ParquetFragment": 57 """Return a fragment name for what should be the first fragment in the partition""" 58 idx = 0 59 return cls( 60 f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet", 61 parquet_write_options, 62 ) 63 64 def next_fragment(self) -> "ParquetFragment": 65 """Return a fragment name for what should be the next fragment in the partition""" 66 idx = int(self.url.split("/")[-1].split("_")[0]) + 1 67 return ParquetFragment( 68 f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet", 69 self.parquet_write_options, 70 ) 71 72 def read(self) -> pl.DataFrame: 73 """Read the fragment""" 74 return pl.read_parquet(self.url) 75 76 def scan(self) -> pl.LazyFrame: 77 """Read the fragment""" 78 return pl.scan_parquet(self.url) 79 80 def write(self, df: pl.DataFrame) -> None: 81 """Write the fragment""" 82 df.write_parquet(self.url, **self.parquet_write_options)
Pointer to a parquet fragment
51 @classmethod 52 def first_fragment( 53 cls: Type["ParquetFragment"], 54 partition_base_url: str, 55 parquet_write_options: dict, 56 ) -> "ParquetFragment": 57 """Return a fragment name for what should be the first fragment in the partition""" 58 idx = 0 59 return cls( 60 f"{partition_base_url}/{idx:06}_{uuid4().hex}.parquet", 61 parquet_write_options, 62 )
Return a fragment name for what should be the first fragment in the partition
64 def next_fragment(self) -> "ParquetFragment": 65 """Return a fragment name for what should be the next fragment in the partition""" 66 idx = int(self.url.split("/")[-1].split("_")[0]) + 1 67 return ParquetFragment( 68 f"{'/'.join(self.url.split('/')[:-1])}/{idx:06}_{uuid4().hex}.parquet", 69 self.parquet_write_options, 70 )
Return a fragment name for what should be the next fragment in the partition
85class HivePartition: 86 """Pointer to a partition in a HiveDataset""" 87 88 def __init__( 89 self, 90 fs: AbstractFileSystem, 91 dataset_url: str, 92 partition_column_values: OrderedDict[str, str], 93 maximum_rows_per_fragment: int, 94 parquet_write_options: dict, 95 ) -> None: 96 self.fs = fs 97 self.partition_column_values = partition_column_values 98 self.maximum_rows_per_fragment = maximum_rows_per_fragment 99 self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/") 100 location = urlsplit(dataset_url) 101 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 102 self._parquet_write_options = parquet_write_options 103 104 @classmethod 105 def from_relative_path( 106 cls: Type["HivePartition"], 107 fs: AbstractFileSystem, 108 dataset_url: str, 109 relative_path: str, 110 maximum_rows_per_fragment: int, 111 parquet_write_options: dict, 112 ) -> "HivePartition": 113 """Create a partition from a relative path""" 114 relative_path_elements = relative_path.split("/") 115 if any(map(lambda x: "=" not in x, relative_path_elements)): 116 raise ValueError( 117 f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'" 118 ) 119 120 return cls( 121 fs=fs, 122 dataset_url=dataset_url, 123 partition_column_values=OrderedDict( 124 map(lambda x: x.split("=", 1), relative_path_elements) 125 ), 126 maximum_rows_per_fragment=maximum_rows_per_fragment, 127 parquet_write_options=parquet_write_options, 128 ) 129 130 def to_relative_path(self) -> str: 131 """Create a relative path from partition column values""" 132 return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()]) 133 134 def fragment_urls(self) -> Iterable[str]: 135 try: 136 return map( 137 lambda p: self.url 138 + "/" 139 + to_relative_location_from(self.scheme_prefix, self.url, p), 140 self.fs.expand_path("/".join([self.url, "*.parquet"])), 141 ) 142 except FileNotFoundError: 143 return [] 144 145 def fragments(self) -> Iterable[ParquetFragment]: 146 """Discover fragments""" 147 return map( 148 lambda fragment_url: ParquetFragment( 149 fragment_url, 150 self._parquet_write_options, 151 ), 152 self.fragment_urls(), 153 ) 154 155 def read(self) -> Optional[pl.DataFrame]: 156 """Concat the fragments in this partition into a single dataframe""" 157 fragments = [f.read() for f in self.fragments()] 158 if len(fragments) > 1: 159 # Merge schemas from different fragments into a superset schema 160 superset_schema: dict[str, pl.PolarsDataType] = reduce( 161 lambda a, b: a | dict(b.schema), 162 fragments[1:], 163 dict(fragments[0].schema), 164 ) 165 166 def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame: 167 missing_columns = superset_schema.keys() - set(df.columns) 168 if missing_columns: 169 return df.with_columns( 170 [ 171 pl.lit(None, superset_schema[col]).alias(col) 172 for col in missing_columns 173 ] 174 ) 175 else: 176 return df 177 178 complete_fragements = [ 179 add_missing_columns(f).select(superset_schema.keys()) for f in fragments 180 ] 181 else: 182 complete_fragements = fragments 183 184 if complete_fragements: 185 return pl.concat(complete_fragements).with_columns( 186 map( 187 lambda part: pl.lit(part[1]).alias(part[0]), 188 self.partition_column_values.items(), 189 ) 190 ) 191 return None 192 193 def scan(self) -> Optional[pl.LazyFrame]: 194 """Concat the fragments in this partition into a single dataframe""" 195 fragments = [f.scan() for f in self.fragments()] 196 if fragments: 197 return pl.concat(fragments).with_columns( 198 map( 199 lambda part: pl.lit(part[1]).alias(part[0]), 200 self.partition_column_values.items(), 201 ) 202 ) 203 return None 204 205 def _write_to_fragments( 206 self, df: pl.DataFrame, target_fragment: ParquetFragment 207 ) -> None: 208 output_columns = list( 209 sorted(set(df.columns) - self.partition_column_values.keys()) 210 ) 211 for fragment_df in df.select(output_columns).iter_slices( 212 self.maximum_rows_per_fragment 213 ): 214 target_fragment.write(fragment_df) 215 target_fragment = target_fragment.next_fragment() 216 217 def delete(self) -> None: 218 """Delete the partition""" 219 if self.fs.exists(self.url): 220 self.fs.delete(self.url, recursive=True) 221 222 def write(self, df: pl.DataFrame) -> None: 223 """Write the dataframe to this partition""" 224 self.delete() 225 self.fs.mkdir(self.url) 226 target_fragment = ParquetFragment.first_fragment( 227 self.url, self._parquet_write_options 228 ) 229 self._write_to_fragments(df, target_fragment) 230 231 def append(self, df: pl.DataFrame) -> None: 232 """Write the dataframe to this partition""" 233 if not self.fs.exists(self.url): 234 self.fs.mkdir(self.url) 235 236 new_fragment = ParquetFragment.first_fragment( 237 self.url, self._parquet_write_options 238 ) 239 try: 240 *_, last_fragment = self.fragments() 241 new_fragment = last_fragment.next_fragment() 242 except ValueError: 243 pass 244 self._write_to_fragments(df, new_fragment)
Pointer to a partition in a HiveDataset
88 def __init__( 89 self, 90 fs: AbstractFileSystem, 91 dataset_url: str, 92 partition_column_values: OrderedDict[str, str], 93 maximum_rows_per_fragment: int, 94 parquet_write_options: dict, 95 ) -> None: 96 self.fs = fs 97 self.partition_column_values = partition_column_values 98 self.maximum_rows_per_fragment = maximum_rows_per_fragment 99 self.url = (dataset_url + "/" + self.to_relative_path()).rstrip("/") 100 location = urlsplit(dataset_url) 101 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 102 self._parquet_write_options = parquet_write_options
104 @classmethod 105 def from_relative_path( 106 cls: Type["HivePartition"], 107 fs: AbstractFileSystem, 108 dataset_url: str, 109 relative_path: str, 110 maximum_rows_per_fragment: int, 111 parquet_write_options: dict, 112 ) -> "HivePartition": 113 """Create a partition from a relative path""" 114 relative_path_elements = relative_path.split("/") 115 if any(map(lambda x: "=" not in x, relative_path_elements)): 116 raise ValueError( 117 f"One or more parition path elements is missing an equal sign while parsing '{relative_path}' from '{dataset_url}'" 118 ) 119 120 return cls( 121 fs=fs, 122 dataset_url=dataset_url, 123 partition_column_values=OrderedDict( 124 map(lambda x: x.split("=", 1), relative_path_elements) 125 ), 126 maximum_rows_per_fragment=maximum_rows_per_fragment, 127 parquet_write_options=parquet_write_options, 128 )
Create a partition from a relative path
130 def to_relative_path(self) -> str: 131 """Create a relative path from partition column values""" 132 return "/".join([f"{k}={v}" for k, v in self.partition_column_values.items()])
Create a relative path from partition column values
145 def fragments(self) -> Iterable[ParquetFragment]: 146 """Discover fragments""" 147 return map( 148 lambda fragment_url: ParquetFragment( 149 fragment_url, 150 self._parquet_write_options, 151 ), 152 self.fragment_urls(), 153 )
Discover fragments
155 def read(self) -> Optional[pl.DataFrame]: 156 """Concat the fragments in this partition into a single dataframe""" 157 fragments = [f.read() for f in self.fragments()] 158 if len(fragments) > 1: 159 # Merge schemas from different fragments into a superset schema 160 superset_schema: dict[str, pl.PolarsDataType] = reduce( 161 lambda a, b: a | dict(b.schema), 162 fragments[1:], 163 dict(fragments[0].schema), 164 ) 165 166 def add_missing_columns(df: pl.DataFrame) -> pl.DataFrame: 167 missing_columns = superset_schema.keys() - set(df.columns) 168 if missing_columns: 169 return df.with_columns( 170 [ 171 pl.lit(None, superset_schema[col]).alias(col) 172 for col in missing_columns 173 ] 174 ) 175 else: 176 return df 177 178 complete_fragements = [ 179 add_missing_columns(f).select(superset_schema.keys()) for f in fragments 180 ] 181 else: 182 complete_fragements = fragments 183 184 if complete_fragements: 185 return pl.concat(complete_fragements).with_columns( 186 map( 187 lambda part: pl.lit(part[1]).alias(part[0]), 188 self.partition_column_values.items(), 189 ) 190 ) 191 return None
Concat the fragments in this partition into a single dataframe
193 def scan(self) -> Optional[pl.LazyFrame]: 194 """Concat the fragments in this partition into a single dataframe""" 195 fragments = [f.scan() for f in self.fragments()] 196 if fragments: 197 return pl.concat(fragments).with_columns( 198 map( 199 lambda part: pl.lit(part[1]).alias(part[0]), 200 self.partition_column_values.items(), 201 ) 202 ) 203 return None
Concat the fragments in this partition into a single dataframe
217 def delete(self) -> None: 218 """Delete the partition""" 219 if self.fs.exists(self.url): 220 self.fs.delete(self.url, recursive=True)
Delete the partition
222 def write(self, df: pl.DataFrame) -> None: 223 """Write the dataframe to this partition""" 224 self.delete() 225 self.fs.mkdir(self.url) 226 target_fragment = ParquetFragment.first_fragment( 227 self.url, self._parquet_write_options 228 ) 229 self._write_to_fragments(df, target_fragment)
Write the dataframe to this partition
231 def append(self, df: pl.DataFrame) -> None: 232 """Write the dataframe to this partition""" 233 if not self.fs.exists(self.url): 234 self.fs.mkdir(self.url) 235 236 new_fragment = ParquetFragment.first_fragment( 237 self.url, self._parquet_write_options 238 ) 239 try: 240 *_, last_fragment = self.fragments() 241 new_fragment = last_fragment.next_fragment() 242 except ValueError: 243 pass 244 self._write_to_fragments(df, new_fragment)
Write the dataframe to this partition
247class HiveDataset: 248 """Handle to multiple partitions""" 249 250 def __init__( 251 self, 252 url: str, 253 partition_columns: list[str] = [], 254 max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT, 255 parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS, 256 ) -> None: 257 self.url = url.rstrip("/") 258 # Load fsspec filesystem from url scheme 259 location = urlsplit(url) 260 self.fs = fsspec.filesystem(location.scheme) 261 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 262 self.partition_columns = partition_columns 263 self._max_rows_per_fragment = max_rows_per_fragment 264 self._parquet_write_options = parquet_write_options 265 266 def partitions(self) -> Iterable[HivePartition]: 267 """Iterate over HivePartitions""" 268 if self.partition_columns: 269 glob_pattern = HivePartition( 270 fs=self.fs, 271 dataset_url=self.url, 272 partition_column_values=OrderedDict( 273 {k: "*" for k in self.partition_columns} 274 ), 275 maximum_rows_per_fragment=self._max_rows_per_fragment, 276 parquet_write_options=self._parquet_write_options, 277 ).to_relative_path() 278 try: 279 partitions = self.fs.expand_path(self.url + "/" + glob_pattern) 280 281 return map( 282 lambda path: HivePartition.from_relative_path( 283 fs=self.fs, 284 dataset_url=self.url, 285 relative_path=to_relative_location_from( 286 self.scheme_prefix, self.url, path 287 ), 288 maximum_rows_per_fragment=self._max_rows_per_fragment, 289 parquet_write_options=self._parquet_write_options, 290 ), 291 sorted(partitions), 292 ) 293 except FileNotFoundError: 294 return [] 295 else: 296 return [ 297 HivePartition( 298 fs=self.fs, 299 dataset_url=self.url, 300 partition_column_values=OrderedDict(), 301 maximum_rows_per_fragment=self._max_rows_per_fragment, 302 parquet_write_options=self._parquet_write_options, 303 ) 304 ] 305 306 def read_partitions(self) -> Iterable[pl.DataFrame]: 307 """Iterate over partitions""" 308 for partition in self.partitions(): 309 df = partition.read() 310 if df is not None: 311 yield df 312 313 def read_partition( 314 self, partition_column_values: dict[str, str] 315 ) -> Optional[pl.DataFrame]: 316 """Read the given partition from the dataset""" 317 if set(partition_column_values.keys()) != set(self.partition_columns): 318 raise ValueError( 319 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 320 ) 321 return HivePartition( 322 fs=self.fs, 323 dataset_url=self.url, 324 partition_column_values=OrderedDict(partition_column_values), 325 maximum_rows_per_fragment=self._max_rows_per_fragment, 326 parquet_write_options=self._parquet_write_options, 327 ).read() 328 329 def delete_partition(self, partition_column_values: dict[str, str]) -> None: 330 """Read the given partition from the dataset""" 331 if set(partition_column_values.keys()) != set(self.partition_columns): 332 raise ValueError( 333 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 334 ) 335 return HivePartition( 336 fs=self.fs, 337 dataset_url=self.url, 338 partition_column_values=OrderedDict(partition_column_values), 339 maximum_rows_per_fragment=self._max_rows_per_fragment, 340 parquet_write_options=self._parquet_write_options, 341 ).delete() 342 343 def scan_partitions(self) -> Iterable[pl.LazyFrame]: 344 """Iterate over partitions""" 345 for partition in self.partitions(): 346 df = partition.scan() 347 if df is not None: 348 yield df 349 350 def scan(self) -> Optional[pl.LazyFrame]: 351 iterable = iter(self.scan_partitions()) 352 first_partition = next(iterable, None) 353 if first_partition is not None: 354 return pl.concat(chain([first_partition], iterable)) 355 return None 356 357 def _check_partition_columns(self, df: pl.DataFrame) -> None: 358 """Check if the given dataframe fits in this dataset""" 359 if not set(df.columns).issuperset(set(self.partition_columns)) or len( 360 df.columns 361 ) <= len(self.partition_columns): 362 raise ValueError( 363 f"Dataframe should have more columns, require at least {self.partition_columns}, got {df.columns}" 364 ) 365 for pcol in self.partition_columns: 366 if df[pcol].dtype != pl.Utf8: 367 raise ValueError( 368 f"Partition column {pcol} is not a string, but {df[pcol].dtype}" 369 ) 370 371 def _partition_split( 372 self, df: pl.DataFrame 373 ) -> Iterable[Tuple[pl.DataFrame, HivePartition]]: 374 """Split dataframe into partitions and partition dataframes""" 375 self._check_partition_columns(df) 376 if self.partition_columns == []: 377 yield df, HivePartition( 378 fs=self.fs, 379 dataset_url=self.url, 380 partition_column_values=OrderedDict(), 381 maximum_rows_per_fragment=self._max_rows_per_fragment, 382 parquet_write_options=self._parquet_write_options, 383 ) 384 else: 385 partition_values = df.select(self.partition_columns).unique().to_dicts() 386 for partition_value in partition_values: 387 yield df.filter( 388 reduce( 389 lambda a, b: a & b, 390 [pl.col(k) == v for k, v in partition_value.items()], 391 ) 392 ), HivePartition( 393 fs=self.fs, 394 dataset_url=self.url, 395 partition_column_values=OrderedDict( 396 [ 397 (pcol, partition_value[pcol]) 398 for pcol in self.partition_columns 399 ] 400 ), 401 maximum_rows_per_fragment=self._max_rows_per_fragment, 402 parquet_write_options=self._parquet_write_options, 403 ) 404 405 def write(self, df: pl.DataFrame) -> None: 406 self._check_partition_columns(df) 407 for partition_df, partition in self._partition_split(df): 408 partition.write(partition_df) 409 410 def append(self, df: pl.DataFrame) -> None: 411 self._check_partition_columns(df) 412 for partition_df, partition in self._partition_split(df): 413 partition.append(partition_df)
Handle to multiple partitions
250 def __init__( 251 self, 252 url: str, 253 partition_columns: list[str] = [], 254 max_rows_per_fragment: int = DEFAULT_ROWS_PER_FRAGMENT, 255 parquet_write_options: dict = DEFAULT_PARQUET_WRITE_OPTIONS, 256 ) -> None: 257 self.url = url.rstrip("/") 258 # Load fsspec filesystem from url scheme 259 location = urlsplit(url) 260 self.fs = fsspec.filesystem(location.scheme) 261 self.scheme_prefix = location.scheme + "://" if location.scheme else "" 262 self.partition_columns = partition_columns 263 self._max_rows_per_fragment = max_rows_per_fragment 264 self._parquet_write_options = parquet_write_options
266 def partitions(self) -> Iterable[HivePartition]: 267 """Iterate over HivePartitions""" 268 if self.partition_columns: 269 glob_pattern = HivePartition( 270 fs=self.fs, 271 dataset_url=self.url, 272 partition_column_values=OrderedDict( 273 {k: "*" for k in self.partition_columns} 274 ), 275 maximum_rows_per_fragment=self._max_rows_per_fragment, 276 parquet_write_options=self._parquet_write_options, 277 ).to_relative_path() 278 try: 279 partitions = self.fs.expand_path(self.url + "/" + glob_pattern) 280 281 return map( 282 lambda path: HivePartition.from_relative_path( 283 fs=self.fs, 284 dataset_url=self.url, 285 relative_path=to_relative_location_from( 286 self.scheme_prefix, self.url, path 287 ), 288 maximum_rows_per_fragment=self._max_rows_per_fragment, 289 parquet_write_options=self._parquet_write_options, 290 ), 291 sorted(partitions), 292 ) 293 except FileNotFoundError: 294 return [] 295 else: 296 return [ 297 HivePartition( 298 fs=self.fs, 299 dataset_url=self.url, 300 partition_column_values=OrderedDict(), 301 maximum_rows_per_fragment=self._max_rows_per_fragment, 302 parquet_write_options=self._parquet_write_options, 303 ) 304 ]
Iterate over HivePartitions
306 def read_partitions(self) -> Iterable[pl.DataFrame]: 307 """Iterate over partitions""" 308 for partition in self.partitions(): 309 df = partition.read() 310 if df is not None: 311 yield df
Iterate over partitions
313 def read_partition( 314 self, partition_column_values: dict[str, str] 315 ) -> Optional[pl.DataFrame]: 316 """Read the given partition from the dataset""" 317 if set(partition_column_values.keys()) != set(self.partition_columns): 318 raise ValueError( 319 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 320 ) 321 return HivePartition( 322 fs=self.fs, 323 dataset_url=self.url, 324 partition_column_values=OrderedDict(partition_column_values), 325 maximum_rows_per_fragment=self._max_rows_per_fragment, 326 parquet_write_options=self._parquet_write_options, 327 ).read()
Read the given partition from the dataset
329 def delete_partition(self, partition_column_values: dict[str, str]) -> None: 330 """Read the given partition from the dataset""" 331 if set(partition_column_values.keys()) != set(self.partition_columns): 332 raise ValueError( 333 f"Partition column value keys {partition_column_values} do not match partition columns {self.partition_columns}" 334 ) 335 return HivePartition( 336 fs=self.fs, 337 dataset_url=self.url, 338 partition_column_values=OrderedDict(partition_column_values), 339 maximum_rows_per_fragment=self._max_rows_per_fragment, 340 parquet_write_options=self._parquet_write_options, 341 ).delete()
Read the given partition from the dataset
343 def scan_partitions(self) -> Iterable[pl.LazyFrame]: 344 """Iterate over partitions""" 345 for partition in self.partitions(): 346 df = partition.scan() 347 if df is not None: 348 yield df
Iterate over partitions