Skip to content

Add num_proc= to .push_to_hub() (Dataset and IterableDataset) #7606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 118 additions & 52 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5397,17 +5397,76 @@ def to_iterable_dataset(self, num_shards: Optional[int] = 1) -> "IterableDataset
ds = ds.with_format(self._format_type)
return ds

def _push_parquet_shards_to_hub_single(
self,
job_id: int,
num_jobs: int,
repo_id: str,
data_dir: str,
split: str,
token: Optional[str],
revision: Optional[str],
create_pr: Optional[bool],
num_shards: int,
embed_external_files: bool,
):
div = num_shards // num_jobs
mod = num_shards % num_jobs
start = div * job_id + min(job_id, mod)
end = start + div + (1 if job_id < mod else 0)

index_shards = (
(start + i, self.shard(num_shards=end - start, index=i, contiguous=True)) for i in range(end - start)
)

api = HfApi(endpoint=config.HF_ENDPOINT, token=token)

uploaded_size = 0
additions: list[CommitOperationAdd] = []
for index, shard in index_shards:
if embed_external_files:
from .io.parquet import get_writer_batch_size

format = shard.format
shard = shard.with_format("arrow")
shard = shard.map(
embed_table_storage,
batched=True,
batch_size=get_writer_batch_size(shard.features),
keep_in_memory=True,
)
shard = shard.with_format(**format)
shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet"
buffer = BytesIO()
shard.to_parquet(buffer)
parquet_content = buffer.getvalue()
uploaded_size += len(parquet_content)
del buffer
shard_addition = CommitOperationAdd(path_in_repo=shard_path_in_repo, path_or_fileobj=parquet_content)
api.preupload_lfs_files(
repo_id=repo_id,
additions=[shard_addition],
repo_type="dataset",
revision=revision,
create_pr=create_pr,
)
additions.append(shard_addition)
yield job_id, False, 1

yield job_id, True, additions

def _push_parquet_shards_to_hub(
self,
repo_id: str,
data_dir: str = "data",
split: Optional[str] = None,
token: Optional[str] = None,
revision: Optional[str] = None,
create_pr: Optional[bool] = False,
max_shard_size: Optional[Union[int, str]] = None,
num_shards: Optional[int] = None,
embed_external_files: bool = True,
data_dir: str,
split: str,
token: Optional[str],
revision: Optional[str],
create_pr: Optional[bool],
max_shard_size: Optional[Union[int, str]],
num_shards: Optional[int],
embed_external_files: bool,
num_proc: Optional[int],
) -> tuple[list[CommitOperationAdd], int, int]:
"""Pushes the dataset shards as Parquet files to the hub.

Expand All @@ -5416,66 +5475,65 @@ def _push_parquet_shards_to_hub(
uploaded_size (`int`): number of uploaded bytes to the repository
dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression
"""
dataset_nbytes = self._estimate_nbytes()

# Find decodable columns, because if there are any, we need to:
# embed the bytes from the files in the shards
decodable_columns = (
[k for k, v in self._info.features.items() if require_decoding(v, ignore_decode_attribute=True)]
if embed_external_files
else []
)

dataset_nbytes = self._estimate_nbytes()
embed_external_files = embed_external_files and bool(decodable_columns)

if num_shards is None:
max_shard_size = convert_file_size_to_int(max_shard_size or config.MAX_SHARD_SIZE)
num_shards = int(dataset_nbytes / max_shard_size) + 1
num_shards = max(num_shards, 1)

shards = (self.shard(num_shards=num_shards, index=i, contiguous=True) for i in range(num_shards))

if decodable_columns:
from .io.parquet import get_writer_batch_size

def shards_with_embedded_external_files(shards: Iterator[Dataset]) -> Iterator[Dataset]:
for shard in shards:
format = shard.format
shard = shard.with_format("arrow")
shard = shard.map(
embed_table_storage,
batched=True,
batch_size=get_writer_batch_size(shard.features),
keep_in_memory=True,
)
shard = shard.with_format(**format)
yield shard

shards = shards_with_embedded_external_files(shards)

api = HfApi(endpoint=config.HF_ENDPOINT, token=token)
num_shards = max(num_shards, num_proc or 1)

uploaded_size = 0
additions: list[CommitOperationAdd] = []
for index, shard in hf_tqdm(
enumerate(shards),
desc="Uploading the dataset shards",

num_jobs = num_proc or 1
kwargs_iterable = [
{
"self": self.shard(num_shards=num_jobs, index=job_id, contiguous=True),
"job_id": job_id,
"num_jobs": num_jobs,
"repo_id": repo_id,
"data_dir": data_dir,
"split": split,
"token": token,
"revision": revision,
"create_pr": create_pr,
"num_shards": num_shards,
"embed_external_files": embed_external_files,
}
for job_id in range(num_jobs)
]
desc = "Uploading the dataset shards"
desc += f" (num_proc={num_proc})" if num_proc is not None and num_proc > 1 else ""
pbar = hf_tqdm(
unit=" shards",
total=num_shards,
):
shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet"
buffer = BytesIO()
shard.to_parquet(buffer)
parquet_content = buffer.getvalue()
uploaded_size += len(parquet_content)
del buffer
shard_addition = CommitOperationAdd(path_in_repo=shard_path_in_repo, path_or_fileobj=parquet_content)
api.preupload_lfs_files(
repo_id=repo_id,
additions=[shard_addition],
repo_type="dataset",
revision=revision,
create_pr=create_pr,
desc=desc,
)
with contextlib.nullcontext() if num_proc is None and num_proc > 1 else Pool(num_proc) as pool:
update_stream = (
Dataset._push_parquet_shards_to_hub_single(**kwargs_iterable[0])
if pool is None
else iflatmap_unordered(
pool,
Dataset._push_parquet_shards_to_hub_single,
kwargs_iterable=kwargs_iterable,
)
)
additions.append(shard_addition)
for job_id, done, content in update_stream:
if not done:
pbar.update(content)
else:
additions += content

uploaded_size = sum(addition.upload_info.size for addition in additions)
return additions, uploaded_size, dataset_nbytes

def push_to_hub(
Expand All @@ -5494,6 +5552,7 @@ def push_to_hub(
max_shard_size: Optional[Union[int, str]] = None,
num_shards: Optional[int] = None,
embed_external_files: bool = True,
num_proc: Optional[int] = None,
) -> CommitInfo:
"""Pushes the dataset to the hub as a Parquet dataset.
The dataset is pushed using HTTP requests and does not need to have neither git or git-lfs installed.
Expand Down Expand Up @@ -5553,6 +5612,12 @@ def push_to_hub(
In particular, this will do the following before the push for the fields of type:

- [`Audio`] and [`Image`]: remove local path information and embed file content in the Parquet files.
num_proc (`int`, *optional*, defaults to `None`):
Number of processes when preparing and uploading the dataset.
This is helpful if the dataset is made of many samples or media files to embed.
Multiprocessing is disabled by default.

<Added version="4.0.0"/>

Return:
huggingface_hub.CommitInfo
Expand Down Expand Up @@ -5636,6 +5701,7 @@ def push_to_hub(
num_shards=num_shards,
create_pr=create_pr,
embed_external_files=embed_external_files,
num_proc=num_proc,
)

# Check if the repo already has a README.md and/or a dataset_infos.json to update them with the new split info (size and pattern)
Expand Down
Loading
Loading