From 34717b42c7a40632f7a04f6bd0ebd62ec74c964e Mon Sep 17 00:00:00 2001 From: shyam-cb <131658626+shyam-cb@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:43:35 +0530 Subject: [PATCH] feat: hugging face to couchbase migration initail code (#2) * initial code * token fix * workflow update --- .github/workflows/release.yaml | 166 ++++++++++++ README.md | 11 +- hf_to_cb_dataset_migrator/cli.py | 161 ++++++++++++ hf_to_cb_dataset_migrator/migration.py | 345 +++++++++++++++++++++++++ hf_to_cb_dataset_migrator/utils.py | 0 requirements.txt | 3 + tests/test_cli.py | 98 +++++++ tests/test_migration.py | 61 +++++ 8 files changed, 843 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/release.yaml create mode 100644 hf_to_cb_dataset_migrator/cli.py create mode 100644 hf_to_cb_dataset_migrator/migration.py create mode 100644 hf_to_cb_dataset_migrator/utils.py create mode 100644 requirements.txt create mode 100644 tests/test_cli.py create mode 100644 tests/test_migration.py diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..01d0178 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,166 @@ +name: Build and Release + +on: + push: + tags: + - 'v*.*.*' # Triggers on version tags like v0.2.0 + workflow_dispatch: # Allows manual triggering + +jobs: + build: + strategy: + matrix: + include: + - os: windows-latest + os_name: windows + arch: x86_64 + arch_name: amd64 + extension: zip + - os: macos-latest + os_name: darwin + arch: x86_64 + arch_name: amd64 + extension: tar.gz + - os: macos-12-arm64 + os_name: darwin + arch: arm64 + arch_name: arm64 + extension: tar.gz + - os: ubuntu-latest + os_name: linux + arch: x86_64 + arch_name: amd64 + extension: tar.gz + - os: ubuntu-latest + os_name: linux + arch: arm64 + arch_name: arm64 + extension: tar.gz + runs-on: ${{ matrix.os }} + steps: + - name: Checkout code + uses: actions/checkout@v3 + + # Step to extract the version number + - name: Set Version + id: get_version + shell: bash + run: | + VERSION="${GITHUB_REF#refs/tags/}" + VERSION="${VERSION#v}" + echo "VERSION=$VERSION" >> $GITHUB_ENV + echo "VERSION=$VERSION" >> $GITHUB_OUTPUT + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.9' + architecture: ${{ matrix.arch }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pyinstaller + + - name: Build with PyInstaller in directory mode + env: + ARCHFLAGS: ${{ runner.os == 'macOS' && matrix.arch == 'arm64' && '-arch arm64' || '' }} + run: | + pyinstaller your_script.py --name hf_to_cb_dataset_migrator + + # Code-signing and notarization steps for macOS + - name: Code-sign on macOS + if: runner.os == 'macOS' + env: + CERTIFICATE: ${{ secrets.APPLE_DEV_CERT }} + CERT_PASSWORD: ${{ secrets.APPLE_DEV_CERT_PASSPHRASE }} + KEYCHAIN_PASSWORD: ${{ secrets.KEYCHAIN_PASSWORD }} + run: | + echo "$CERTIFICATE" | base64 --decode > certificate.p12 + security create-keychain -p "$KEYCHAIN_PASSWORD" build.keychain + security import certificate.p12 -k build.keychain -P "$CERT_PASSWORD" -T /usr/bin/codesign + security set-keychain-settings -lut 21600 build.keychain + security list-keychains -s build.keychain + security unlock-keychain -p "$KEYCHAIN_PASSWORD" build.keychain + # Sign the main executable + codesign --force --options runtime --sign "Developer ID Application: Your Name (Team ID)" dist/hf_to_cb_dataset_migrator/hf_to_cb_dataset_migrator + # Sign all dynamic libraries and executables + find dist/hf_to_cb_dataset_migrator -type f \( -name "*.so" -or -name "*.dylib" -or -perm -u=x \) -exec codesign --force --options runtime --sign "Developer ID Application: Your Name (Team ID)" {} \; + # Verify the code-signing + codesign --verify --deep --strict --verbose=2 dist/hf_to_cb_dataset_migrator/hf_to_cb_dataset_migrator + + # Compression and notarization for macOS + - name: Compress Application Directory on macOS + if: runner.os == 'macOS' + shell: bash + env: + VERSION: ${{ env.VERSION }} + run: | + APP_NAME="hf_to_cb_dataset_migrator_${VERSION}_${{ matrix.os_name }}_${{ matrix.arch_name }}" + cd dist + zip -r "../$APP_NAME.zip" hf_to_cb_dataset_migrator + + - name: Notarize the macOS binary + if: runner.os == 'macOS' + env: + APPLE_ID: ${{ secrets.APPLE_ID }} + APPLE_APP_PASSWORD: ${{ secrets.APPLE_APP_PASSWORD }} + APPLE_TEAM_ID: ${{ secrets.APPLE_TEAM_ID }} + VERSION: ${{ env.VERSION }} + run: | + APP_NAME="hf_to_cb_dataset_migrator_${VERSION}_${{ matrix.os_name }}_${{ matrix.arch_name }}" + xcrun notarytool submit "$APP_NAME.zip" --apple-id "$APPLE_ID" --password "$APPLE_APP_PASSWORD" --team-id "$APPLE_TEAM_ID" --wait + # Staple the notarization ticket + xcrun stapler staple "$APP_NAME.zip" + + # Compression for Linux + - name: Compress Application Directory on Linux + if: runner.os == 'Linux' + shell: bash + env: + VERSION: ${{ env.VERSION }} + run: | + APP_NAME="hf_to_cb_dataset_migrator_${VERSION}_${{ matrix.os_name }}_${{ matrix.arch_name }}" + tar -czvf "$APP_NAME.${{ matrix.extension }}" -C dist hf_to_cb_dataset_migrator + + # Compression for Windows + - name: Compress Application Directory on Windows + if: runner.os == 'Windows' + shell: powershell + env: + VERSION: ${{ env.VERSION }} + run: | + $APP_NAME = "hf_to_cb_dataset_migrator_$Env:VERSION_${{ matrix.os_name }}_${{ matrix.arch_name }}" + Compress-Archive -Path dist\hf_to_cb_dataset_migrator\* -DestinationPath "$APP_NAME.${{ matrix.extension }}" + + - name: Upload artifact + uses: actions/upload-artifact@v3 + with: + name: ${{ matrix.os_name }}_${{ matrix.arch_name }} + path: | + hf_to_cb_dataset_migrator_*_${{ matrix.os_name }}_${{ matrix.arch_name }}.* + + - name: Create Release + if: ${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') }} + id: create_release + uses: actions/create-release@v1 + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ env.VERSION }} + draft: false + prerelease: false + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Upload Release Asset + if: ${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v') }} + uses: actions/upload-release-asset@v1 + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: | + hf_to_cb_dataset_migrator_*_${{ matrix.os_name }}_${{ matrix.arch_name }}.* + asset_name: hf_to_cb_dataset_migrator_${{ env.VERSION }}_${{ matrix.os_name }}_${{ matrix.arch_name }}.${{ matrix.extension }} + asset_content_type: application/octet-stream + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/README.md b/README.md index 906124b..84ef46f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,9 @@ -# hf-to-cb-dataset-migrator -Hugging face to Couchbase dataset migrator +# Hugging Face Dataset to Couchbase Migrator + +A CLI tool to interact with Hugging Face datasets and migrate them to Couchbase, with support for streaming data. + +## Installation + +```bash +pip install -r requirements.txt +python setup.py install \ No newline at end of file diff --git a/hf_to_cb_dataset_migrator/cli.py b/hf_to_cb_dataset_migrator/cli.py new file mode 100644 index 0000000..0fb3155 --- /dev/null +++ b/hf_to_cb_dataset_migrator/cli.py @@ -0,0 +1,161 @@ +# my_cli/cli.py + +import click +import json +from migration import DatasetMigrator +from typing import Any + +@click.group() +def main(): + """CLI tool to interact with Hugging Face datasets and migrate them to Couchbase.""" + pass + +@main.command('list-configs') +@click.option('--path', required=True, help='Path or name of the dataset.') +@click.option('--revision', default=None, help='Version of the dataset script to load (optional).') +@click.option('--download-config', default=None, help='Specific download configuration parameters (optional).') +@click.option('--download-mode', type=click.Choice(['reuse_dataset_if_exists', 'force_redownload']), default=None, + help='Download mode (optional).') +@click.option('--dynamic-modules-path', default=None, help='Path to dynamic modules (optional).') +@click.option('--data-files', default=None, multiple=True, help='Path(s) to source data file(s) (optional).') +@click.option('--token',default=None, help='Use authentication token for private datasets.') +@click.option('--json-output', is_flag=True, help='Output the configurations in JSON format.') +def list_configs_cmd(path, revision, download_config, download_mode, dynamic_modules_path, + data_files, token, json_output): + """List all configuration names for a given dataset.""" + migrator = DatasetMigrator(token=token) + download_kwargs = { + 'revision': revision, + 'download_config': json.load(download_config) if download_config else None, + 'download_mode': download_mode, + 'dynamic_modules_path': dynamic_modules_path, + 'data_files': data_files if data_files else None, + } + # Remove None values + download_kwargs = {k: v for k, v in download_kwargs.items() if v is not None} + + configs = migrator.list_configs(path, **download_kwargs) + if configs: + if json_output: + click.echo(json.dumps(configs, indent=2)) + else: + click.echo(f"Available configurations for '{path}':") + for config in configs: + click.echo(f"- {config}") + else: + click.echo(f"No configurations found for dataset '{path}' or dataset not found.") + +@main.command('list-splits') +@click.option('--path', required=True, help='Path or name of the dataset.') +@click.option('--name', 'config_name', default=None, help='Configuration name of the dataset (optional).') +@click.option('--data-files', default=None, multiple=True, help='Path(s) to source data file(s) (optional).') +@click.option('--download-config', default=None, help='Specific download configuration parameters (optional).') +@click.option('--download-mode', type=click.Choice(['reuse_dataset_if_exists', 'force_redownload']), default=None, + help='Download mode (optional).') +@click.option('--revision', default=None, help='Version of the dataset script to load (optional).') +@click.option('--token', default=None, help='Authentication token for private datasets (optional).') +@click.option('--json-output', is_flag=True, help='Output the splits in JSON format.') +def list_splits_cmd(path, config_name, data_files, download_config, download_mode, revision, token, + json_output): + """List all available splits for a given dataset and configuration.""" + migrator = DatasetMigrator(token=token) + + config_kwargs = { + 'data_files': data_files if data_files else None, + 'download_config': json.load(download_config) if download_config else None, + 'download_mode': download_mode, + 'revision': revision, + } + # Remove None values + config_kwargs = {k: v for k, v in config_kwargs.items() if v is not None} + + splits = migrator.list_splits(path, config_name=config_name, **config_kwargs) + if splits: + if json_output: + click.echo(json.dumps(splits, indent=2)) + else: + config_name_display = config_name if config_name else "default" + click.echo(f"Available splits for dataset '{path}' with config '{config_name_display}':") + for split in splits: + click.echo(f"- {split}") + else: + click.echo(f"No splits found for dataset '{path}' with config '{config_name}' or dataset not found.") + +@main.command() +@click.option('--path', required=True, help='Path or name of the dataset.') +@click.option('--name', default=None, help='Configuration name of the dataset (optional).') +@click.option('--data-dir', default=None, help='Directory with the data files (optional).') +@click.option('--data-files', default=None, multiple=True, help='Path(s) to source data file(s) (optional).') +@click.option('--split', default=None, help='Which split of the data to load (optional).') +@click.option('--cache-dir', default=None, help='Cache directory to store the datasets (optional).') +#@click.option('--features', default=None, help='Set of features to use (optional).') +@click.option('--download-config', default=None, help='Specific download configuration parameters (optional).') +@click.option('--download-mode', type=click.Choice(['reuse_dataset_if_exists', 'force_redownload']), default=None, + help='Download mode (optional).') +@click.option('--verification-mode', type=click.Choice(['no_checks', 'basic_checks', 'all_checks']), default=None, + help='Verification mode (optional).') +@click.option('--keep-in-memory', is_flag=True, default=False, help='Keep the dataset in memory (optional).') +@click.option('--save-infos', is_flag=True, default=False, help='Save dataset information (default: False).') +@click.option('--revision', default=None, help='Version of the dataset script to load (optional).') +@click.option('--token', default=None, help='Authentication token for private datasets (optional).') +@click.option('--streaming/--no-streaming', default=True, help='Load the dataset in streaming mode (default: True).') +@click.option('--num-proc', default=None, type=int, help='Number of processes to use (optional).') +@click.option('--storage-options', default=None, help='Storage options for remote filesystems (optional).') +@click.option('--trust-remote-code', is_flag=True, default=None, + help='Allow loading arbitrary code from the dataset repository (optional).') +@click.option('--id-fields', default=None, help='Comma-separated list of field names to use as document ID.') +@click.option('--cb-url', prompt='Couchbase URL', help='Couchbase cluster URL (e.g., couchbase://localhost).') +@click.option('--cb-username', prompt='Couchbase username', help='Username for Couchbase authentication.') +@click.option('--cb-password', prompt=True, hide_input=True, confirmation_prompt=False, + help='Password for Couchbase authentication.') +@click.option('--cb-bucket', prompt='Couchbase bucket name', help='Couchbase bucket to store data.') +@click.option('--cb-scope', default=None, help='Couchbase scope name (optional).') +@click.option('--cb-collection', default=None, help='Couchbase collection name (optional).') +def migrate( + path, name, data_dir, data_files, split, cache_dir, + #features, + download_config, download_mode, + verification_mode, keep_in_memory, save_infos, revision, token, streaming, num_proc, storage_options, + trust_remote_code, id_fields, cb_url, cb_username, cb_password, cb_bucket, cb_scope, cb_collection +): + """Migrate datasets from Hugging Face to Couchbase.""" + click.echo(f"Starting migration of dataset '{path}' to Couchbase bucket '{cb_bucket}'...") + migrator = DatasetMigrator(token=token) + + # Prepare data_files + data_files = list(data_files) if data_files else None + + result = migrator.migrate_dataset( + path=path, + cb_url=cb_url, + cb_username=cb_username, + cb_password=cb_password, + couchbase_bucket=cb_bucket, + cb_scope=cb_scope, + cb_collection=cb_collection, + id_fields=id_fields, + name=name, + data_dir=data_dir, + data_files=data_files, + split=split, + cache_dir=cache_dir, + #features=features, + download_config=download_config, + download_mode=download_mode, + verification_mode=verification_mode, + keep_in_memory=keep_in_memory, + save_infos=save_infos, + revision=revision, + token=token, + streaming=streaming, + num_proc=num_proc, + storage_options=json.loads(storage_options) if storage_options else None, + trust_remote_code=trust_remote_code, + ) + if result: + click.echo("Migration completed successfully.") + else: + click.echo("Migration failed.") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/hf_to_cb_dataset_migrator/migration.py b/hf_to_cb_dataset_migrator/migration.py new file mode 100644 index 0000000..c9146b9 --- /dev/null +++ b/hf_to_cb_dataset_migrator/migration.py @@ -0,0 +1,345 @@ +# my_cli/migration.py + +import couchbase.collection +from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names +from datasets.utils.logging import set_verbosity_error +from datasets.download import DownloadConfig +from datasets.download.download_manager import DownloadMode +from datasets.utils.info_utils import VerificationMode +from datasets.features import Features +from datasets.utils import Version +from datasets import DatasetDict, Dataset, IterableDatasetDict, IterableDataset, Split +import couchbase +from couchbase.cluster import Cluster +from couchbase.options import ClusterOptions, KnownConfigProfiles +from couchbase.auth import PasswordAuthenticator +from couchbase.exceptions import CouchbaseException, DocumentExistsException +from couchbase.result import MultiMutationResult +from datetime import timedelta +import uuid +import logging +from typing import Any, Dict, List, Union, Optional, Sequence, Mapping + +logger = logging.getLogger(__name__) + +class DatasetMigrator: + def __init__(self, token: Optional[str] = None): + """ + Initializes the DatasetMigrator with optional API key for Hugging Face. + + :param token: Hugging Face Token for accessing private datasets (optional) + """ + self.token = token + self.cluster = None + self.collection = None + + def connect(self, cb_url: str, cb_username: str, cb_password: str, couchbase_bucket: str, + cb_scope: Optional[str] = None, cb_collection: Optional[str] = None): + """Establishes a connection to the Couchbase cluster and gets the collection.""" + cluster_opts = ClusterOptions( + PasswordAuthenticator(cb_username, cb_password), + ) + cluster_opts.apply_profile(KnownConfigProfiles.WanDevelopment) + self.cluster = Cluster(cb_url, cluster_opts) + self.cluster.wait_until_ready(timedelta(seconds=60)) # Wait until cluster is ready + bucket = self.cluster.bucket(couchbase_bucket) + + # Get the collection + if cb_scope and cb_collection: + scope = bucket.scope(cb_scope) + self.collection = scope.collection(cb_collection) + else: + self.collection = bucket.default_collection() + + def close(self): + """Closes the connection to the Couchbase cluster.""" + if self.cluster: + self.cluster.close() + self.cluster = None + self.collection = None + + def list_configs(self, path: str, + revision: Union[str, Version, None] = None, + download_config: Optional[Dict] = None, + download_mode: Union[DownloadMode, str, None] = None, + dynamic_modules_path: Optional[str] = None, + data_files: Union[Dict, List, str, None] = None, + **config_kwargs: Any) -> Optional[List[str]]: + """ + Lists all configuration names for a specified dataset. + Parameters: + path (str): The path or name of the dataset. + revision (Union[str, Version, None], optional): The version or revision of the dataset script to load. + download_config (Optional[Dict], optional): Dictionary of download configuration parameters. + download_mode (Union[DownloadMode, str, None], optional): Specifies the download mode. + dynamic_modules_path (Optional[str], optional): Path to dynamic modules for custom processing. + data_files (Union[Dict, List, str, None], optional): Paths to source data files. + config_kwargs (Any): Additional keyword arguments for dataset configuration. + + Returns: + Optional[List[str]]: A list of configuration names if successful; None if an error occurs. + """ + try: + set_verbosity_error() # Suppress warnings + # Include API key if provided + if self.token: + config_kwargs['token'] = self.token + + configs = get_dataset_config_names( + path, + revision=revision, + download_config=DownloadConfig(**download_config) if download_config else None, + download_mode=download_mode, + dynamic_modules_path=dynamic_modules_path, + data_files=data_files, + **config_kwargs, + ) + return configs + except Exception as e: + logger.error(f"An error occurred while fetching configs: {e}") + return None + + def list_splits(self, path: str, + config_name: Optional[str] = None, + data_files: Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]], None] = None, + download_config: Optional[Dict] = None, + download_mode: Union[DownloadMode, str, None] = None, + revision: Union[str, Version, None] = None, + **config_kwargs: Any) -> Optional[List[str]]: + """ + List all available splits for a given dataset and configuration. + + Parameters: + path (str): Path or name of the dataset. + config_name (str, optional): Configuration name of the dataset. + data_files (Union[Dict, List, str, None], optional): Path(s) to source data file(s). + download_config (Optional[Dict], optional): Specific download configuration parameters. + download_mode (Union[DownloadMode, str, None], optional): Specifies the download mode. + revision (Union[str, Version, None], optional): Version of the dataset script to load. + config_kwargs (Any): Additional keyword arguments for configuration. + + Returns: + Optional[List[str]]: A list of split names if successful; None if an error occurs. + """ + try: + set_verbosity_error() # Suppress warnings + # Include token if provided + if self.token: + config_kwargs['token'] = self.token + + splits = get_dataset_split_names( + path, + config_name=config_name, + data_files=data_files, + download_config=DownloadConfig(**download_config) if download_config else None, + download_mode=download_mode, + revision=revision, + **config_kwargs + ) + return splits + except Exception as e: + logger.error(f"An error occurred while fetching splits: {e}") + return None + + def migrate_dataset( + self, + path: str, + cb_url: str, + cb_username: str, + cb_password: str, + couchbase_bucket: str, + cb_scope: Optional[str] = None, + cb_collection: Optional[str] = None, + id_fields: Optional[str] = None, + name: Optional[str] = None, + data_dir: Optional[str] = None, + data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None, + split: Optional[Union[str, Split]] = None, + cache_dir: Optional[str] = None, + #features: Optional[Features] = None, + download_config: Optional[Dict] = None, + download_mode: Optional[Union[DownloadMode, str]] = None, + verification_mode: Optional[Union[VerificationMode, str]] = None, + keep_in_memory: Optional[bool] = None, + save_infos: bool = False, + revision: Optional[Union[str, Version]] = None, + streaming: bool = False, + num_proc: Optional[int] = None, + storage_options: Optional[Dict] = None, + trust_remote_code: Optional[bool] = None, + **config_kwargs: Any, + ) -> bool: + """ + Migrates a Hugging Face dataset to Couchbase using batch insertion. + + This function accepts all parameters from the `load_dataset` function to customize dataset loading. + + Parameters: + path (str): Path or name of the dataset. + cb_url (str): Couchbase cluster URL (e.g., couchbase://localhost). + cb_username (str): Username for Couchbase authentication. + cb_password (str): Password for Couchbase authentication. + couchbase_bucket (str): Couchbase bucket to store data. + cb_scope (str, optional): Couchbase scope name. + cb_collection (str, optional): Couchbase collection name. + id_fields (str): Comma-separated list of field names to use as document ID. + name (str, optional): Configuration name of the dataset. + data_dir (str, optional): Directory with the data files. + data_files (Union[Dict, List, str, None], optional): Path(s) to source data file(s). + split (str, optional): Which split of the data to load. + cache_dir (str, optional): Cache directory to store the datasets. + features (Optional[Features], optional): Set of features to use. + download_config (Optional[Dict], optional): Specific download configuration parameters. + download_mode (Union[DownloadMode, str, None], optional): Specifies the download mode. + verification_mode (str, optional): Verification mode. + keep_in_memory (bool, optional): Whether to keep the dataset in memory. + save_infos (bool, default=False): Whether to save dataset information. + revision (Union[str, Version, None], optional): Version of the dataset script to load. + streaming (bool, default=False): Whether to load the dataset in streaming mode. + num_proc (int, optional): Number of processes to use. + storage_options (Dict, optional): Storage options for remote filesystems. + trust_remote_code (bool, optional): Allow loading arbitrary code from the dataset repository. + config_kwargs (Any): Additional keyword arguments for dataset configuration. + + Returns: + bool: True if migration is successful, False otherwise. + """ + try: + # Include token if provided + if self.token: + config_kwargs['token'] = self.token + print(config_kwargs) + # Prepare parameters for load_dataset + load_kwargs = { + 'path': path, + 'name': name, + 'data_dir': data_dir, + 'data_files': data_files, + 'split': split, + 'cache_dir': cache_dir, + #'features': features, + 'download_config': DownloadConfig(**download_config) if download_config else None, + 'download_mode': download_mode, + 'verification_mode': verification_mode, + 'keep_in_memory': keep_in_memory, + 'save_infos': save_infos, + 'revision': revision, + 'streaming': streaming, + 'num_proc': num_proc, + 'storage_options': storage_options, + 'trust_remote_code': trust_remote_code, + **config_kwargs + } + + # Remove None values + load_kwargs = {k: v for k, v in load_kwargs.items() if v is not None} + + # Parse id_fields into a list + if id_fields: + id_fields_list = [field.strip() for field in id_fields.split(',') if field.strip()] + if not id_fields_list: + id_fields_list = None + else: + id_fields_list = None + + # Load the dataset from Hugging Face + dataset = load_dataset(**load_kwargs) + + # Establish Couchbase connection + self.connect(cb_url, cb_username, cb_password, couchbase_bucket, cb_scope, cb_collection) + + total_records = 0 + + # Function to construct document ID + def construct_doc_id(example): + if id_fields_list: + try: + id_values = [str(example[field]) for field in id_fields_list] + doc_id = '_'.join(id_values) + return doc_id + except KeyError as e: + raise ValueError(f"Field '{e.args[0]}' not found in the dataset examples.") + else: + return str(uuid.uuid4()) + + # If dataset is a dict (multiple splits), iterate over each split + if isinstance(dataset, (DatasetDict, IterableDatasetDict)): + for split_name, split_dataset in dataset.items(): + print(f"Processing split '{split_name}'...") + batch = {} + for example in split_dataset: + doc_id = construct_doc_id(example) + # Include split name in the document + example_with_split = dict(example) + example_with_split['split'] = split_name + batch[doc_id] = example_with_split + total_records += 1 + + # Batch insert every 1000 documents + if len(batch) >= 1000: + self.insert_multi(batch) + batch.clear() + print(f"{total_records} records migrated...") + # Insert remaining documents in batch + if batch: + self.insert_multi(batch) + print(f"{total_records} records migrated...") + batch.clear() + else: + # Dataset is a single split + split_name = str(split) if split else 'unspecified' + print(f"Processing split '{split_name}'...") + batch = {} + for example in dataset: + doc_id = construct_doc_id(example) + example_with_split = dict(example) + example_with_split['split'] = split_name + batch[doc_id] = example_with_split + total_records += 1 + + # Batch insert every 1000 documents + if len(batch) >= 1000: + self.insert_multi(batch) + batch.clear() + print(f"{total_records} records migrated...") + # Insert remaining documents in batch + if batch: + self.insert_multi(batch) + print(f"{total_records} records migrated...") + batch.clear() + + print(f"Total records migrated: {total_records}") + return True + except Exception as e: + logger.error(f"An error occurred during migration: {e}") + return False + finally: + self.close() + + def insert_multi(self, batch): + """ + Performs a batch insert operation using insert_multi. + + :param batch: A dictionary where keys are document IDs and values are documents + """ + try: + result: MultiMutationResult = self.collection.insert_multi(batch) + except Exception as e: + logger.error(f"Write error: {e}") + msg = f"Failed to write documents to Couchbase. Error: {e}" + raise Exception(msg) from e + + if not result.all_ok and result.exceptions: + duplicate_ids = [] + other_errors = [] + for doc_id, ex in result.exceptions.items(): + if isinstance(ex, DocumentExistsException): + duplicate_ids.append(doc_id) + else: + other_errors.append({"id": doc_id, "exception": ex}) + if duplicate_ids: + msg = f"IDs '{', '.join(duplicate_ids)}' already exist in the document store." + raise Exception(msg) + if other_errors: + msg = f"Failed to write documents to Couchbase. Errors:\n{other_errors}" + raise Exception(msg) \ No newline at end of file diff --git a/hf_to_cb_dataset_migrator/utils.py b/hf_to_cb_dataset_migrator/utils.py new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5eaae36 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +click==8.1.7 +couchbase==4.3.2 +datasets==3.0.1 \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..520b14f --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,98 @@ +# tests/test_cli.py + +import unittest +from click.testing import CliRunner +from unittest.mock import patch, MagicMock +import json +from hf_to_cb_dataset_migrator.cli import main + +class TestCLI(unittest.TestCase): + @patch('my_cli.cli.DatasetMigrator') + def test_migrate_with_all_parameters(self, mock_migrator_class): + # Mock the migrator instance + mock_migrator = mock_migrator_class.return_value + mock_migrator.migrate_dataset.return_value = True + + runner = CliRunner() + result = runner.invoke(main, [ + 'migrate', + '--path', 'dataset', + '--name', 'config1', + '--data-dir', '/path/to/data_dir', + '--data-files', '/path/to/file1', '/path/to/file2', + '--split', 'train', + '--cache-dir', '/path/to/cache', + '--features', 'your_features', + '--download-config', 'your_download_config', + '--download-mode', 'force_redownload', + '--verification-mode', 'all_checks', + '--keep-in-memory', + '--save-infos', + '--revision', 'main', + '--token', 'your_token', + '--no-streaming', + '--num-proc', '4', + '--storage-options', '{"option_key": "option_value"}', + '--trust-remote-code', + '--id-fields', 'id,title', + '--cb-url', 'couchbase://localhost', + '--cb-username', 'username', + '--cb-password', 'password', + '--cb-bucket', 'bucket', + '--cb-scope', 'scope', + '--cb-collection', 'collection', + '--api-key', 'api_key_value', + '--config-kwargs', 'key1=value1', 'key2=value2' + ]) + self.assertEqual(result.exit_code, 0) + mock_migrator_class.assert_called_with(api_key='api_key_value') + mock_migrator.migrate_dataset.assert_called_once() + self.assertIn('Migration completed successfully.', result.output) + + @patch('my_cli.cli.DatasetMigrator') + def test_migrate_invalid_config(self, mock_migrator_class): + # Mock the get_dataset_config_names function + with patch('my_cli.cli.get_dataset_config_names') as mock_get_configs: + mock_get_configs.return_value = ['config1', 'config2'] + mock_migrator = mock_migrator_class.return_value + + runner = CliRunner() + result = runner.invoke(main, [ + 'migrate', + '--path', 'dataset', + '--name', 'invalid_config', + '--cb-url', 'couchbase://localhost', + '--cb-username', 'user', + '--cb-password', 'pass', + '--cb-bucket', 'bucket' + ]) + self.assertEqual(result.exit_code, 0) + mock_get_configs.assert_called_once() + self.assertIn("Invalid configuration name 'invalid_config'. Available configurations are: ['config1', 'config2']", result.output) + mock_migrator.migrate_dataset.assert_not_called() + + @patch('my_cli.cli.DatasetMigrator') + def test_migrate_invalid_split(self, mock_migrator_class): + # Mock the get_dataset_config_names and get_dataset_split_names functions + with patch('my_cli.cli.get_dataset_config_names') as mock_get_configs, \ + patch('my_cli.cli.get_dataset_split_names') as mock_get_splits: + mock_get_configs.return_value = ['config1'] + mock_get_splits.return_value = ['train', 'test'] + mock_migrator = mock_migrator_class.return_value + + runner = CliRunner() + result = runner.invoke(main, [ + 'migrate', + '--path', 'dataset', + '--name', 'config1', + '--split', 'invalid_split', + '--cb-url', 'couchbase://localhost', + '--cb-username', 'user', + '--cb-password', 'pass', + '--cb-bucket', 'bucket' + ]) + self.assertEqual(result.exit_code, 0) + mock_get_configs.assert_called_once() + mock_get_splits.assert_called_once() + self.assertIn("Invalid split name 'invalid_split'. Available splits are: ['train', 'test']", result.output) + mock_migrator.migrate_dataset.assert_not_called() \ No newline at end of file diff --git a/tests/test_migration.py b/tests/test_migration.py new file mode 100644 index 0000000..0634891 --- /dev/null +++ b/tests/test_migration.py @@ -0,0 +1,61 @@ +# tests/test_migration.py + +import unittest +from unittest.mock import patch, MagicMock +from hf_to_cb_dataset_migrator.migration import DatasetMigrator +from datasets import DatasetDict, Dataset, IterableDatasetDict +from datasets.features import Features + +class TestDatasetMigrator(unittest.TestCase): + @patch('my_cli.migration.load_dataset') + @patch('my_cli.migration.get_dataset_config_names') + @patch('my_cli.migration.get_dataset_split_names') + @patch('my_cli.migration.DatasetMigrator.connect') + @patch('my_cli.migration.DatasetMigrator.close') + @patch('my_cli.migration.DatasetMigrator.insert_multi') + def test_migrate_dataset_with_all_parameters( + self, mock_insert_multi, mock_close, mock_connect, mock_get_splits, mock_get_configs, mock_load_dataset + ): + # Setup the mock return values + mock_get_configs.return_value = ['config1', 'config2'] + mock_get_splits.return_value = ['train', 'test'] + mock_dataset = MagicMock(spec=DatasetDict) + mock_dataset.items.return_value = [('train', [{'id': 1, 'text': 'sample'}])] + mock_load_dataset.return_value = mock_dataset + + migrator = DatasetMigrator(api_key='api_key_value') + result = migrator.migrate_dataset( + path='dataset', + cb_url='couchbase://localhost', + cb_username='user', + cb_password='pass', + couchbase_bucket='bucket', + cb_scope='scope', + cb_collection='collection', + id_fields='id', + name='config1', + data_dir='/path/to/data_dir', + data_files=['/path/to/file1', '/path/to/file2'], + split='train', + cache_dir='/path/to/cache', + features=Features({'text': 'string'}), + download_config='download_config_value', + download_mode='force_redownload', + verification_mode='all_checks', + keep_in_memory=True, + save_infos=True, + revision='main', + token='token_value', + streaming=False, + num_proc=4, + storage_options={'option_key': 'option_value'}, + trust_remote_code=True, + custom_arg='custom_value' + ) + self.assertTrue(result) + mock_get_configs.assert_called_once() + mock_get_splits.assert_called_once() + mock_load_dataset.assert_called_once() + mock_connect.assert_called_once() + mock_insert_multi.assert_called() + mock_close.assert_called_once() \ No newline at end of file