Transformers fine-tuning script doesn't work with FSDP

120 views Asked by At

I am trying to use FSDP with Accelerate and I seem to keep running into the same error:

TypeError: FullyShardedDataParallel.__init__() got an unexpected keyword argument 'ignored_parameters'

I run it via NCCL_P2P_DISABLE=1 accelerate launch new.py Full error log at the bottom. My script works fine without implementing FSDP. This is my code followed by the accelerate yaml config files.

import html
import json
import os
from time import time

import pandas as pd
import torch
from accelerate import Accelerator
from datasets import Dataset
from torch.optim import AdamW
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    get_linear_schedule_with_warmup,
    set_seed,
)


def process_dataset(json_file):
    with open(json_file, "r", encoding="utf-8") as file:
        data = json.load(file)

    def concatenate_messages(group):
        conversation = ''
        for _, row in group.iterrows():
            if row['PCP_MESSAGE'].strip():
                conversation += 'PCP: ' + row['PCP_MESSAGE'].strip() + '\n'
            if row['SR_MESSAGE'].strip():
                conversation += 'SR: ' + row['SR_MESSAGE'].strip() + '\n'
        return conversation

    df = pd.DataFrame(data)
    df['PCP_MESSAGE'].fillna('', inplace=True)
    df['SR_MESSAGE'].fillna('', inplace=True)
    df['PCP_MESSAGE'] = df['PCP_MESSAGE'].apply(html.unescape)
    df['SR_MESSAGE'] = df['SR_MESSAGE'].apply(html.unescape)
    df.sort_values(by=['CONSULTID', 'CREATED'], inplace=True)

    grouped = df.groupby('CONSULTID').apply(
        concatenate_messages).reset_index(name='conversation')

    return grouped


def get_dataloaders(accelerator: Accelerator, batch_size: int = 16):
    tokenizer = AutoTokenizer.from_pretrained("gpt2-xl")
    tokenizer.pad_token = tokenizer.eos_token

    dataset = Dataset.from_pandas(process_dataset("./asdf.json"))

    def tokenize_function(examples):
        return tokenizer(examples["conversation"], padding="max_length", truncation=True, max_length=128)

    with accelerator.main_process_first():
        tokenized_dataset = dataset.map(tokenize_function, batched=True)

    tokenized_dataset.set_format(
        'torch', columns=['input_ids', 'attention_mask'])

    train_size = int(0.8 * len(tokenized_dataset))
    test_size = len(tokenized_dataset) - train_size
    tokenized_train_dataset, tokenized_eval_dataset = torch.utils.data.random_split(
        tokenized_dataset, [train_size, test_size])

    # Instantiate dataloaders.
    train_dataloader = DataLoader(
        tokenized_train_dataset,
        shuffle=True,
        batch_size=batch_size,
        drop_last=True
    )

    eval_dataloader = DataLoader(
        tokenized_eval_dataset,
        shuffle=False,
        batch_size=batch_size*2,
        drop_last=(accelerator.mixed_precision == "fp8"),
    )

    return train_dataloader, eval_dataloader


def training_function():
    # Initialize accelerator
    accelerator = Accelerator(mixed_precision="fp16")
    # Sample hyper-parameters for learning rate, batch size, seed and a few other HPs
    lr = 5e-5
    num_epochs = 3
    seed = 42
    batch_size = 1
    num_warmup_steps = 100

    set_seed(seed)

    train_dataloader, eval_dataloader = get_dataloaders(
        accelerator, batch_size)

    # Instantiate the model (we build the model here so that the seed also control new weights initialization)
    model = AutoModelForCausalLM.from_pretrained("gpt2")
    model = accelerator.prepare(model)

    optimizer = AdamW(params=model.parameters(), lr=lr)

    # Instantiate scheduler
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps=num_warmup_steps,
        num_training_steps=(len(train_dataloader) *
                            num_epochs),
    )

    # Prepare everything
    # There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
    # prepare method.
    optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
        optimizer, train_dataloader, eval_dataloader, lr_scheduler
    )

    # Initialize logging variables
    total_train_loss = 0
    total_eval_loss = 0

    # Now we train the model
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for batch in tqdm(train_dataloader, desc="Training"):
            with accelerator.accumulate(model):
                # Process the batch
                inputs = {k: v.to(accelerator.device)
                          for k, v in batch.items()}
                if "labels" not in inputs:
                    inputs["labels"] = inputs["input_ids"]

                outputs = model(**inputs)
                loss = outputs.loss
                total_train_loss += loss.item()
                accelerator.backward(loss)
                optimizer.step()
                lr_scheduler.step()
                optimizer.zero_grad()

        # Evaluation loop after each training epoch
        model.eval()
        total_eval_loss = 0
        for batch in tqdm(eval_dataloader, "Evaluating"):
            with torch.no_grad():
                inputs = {k: v.to(accelerator.device)
                          for k, v in batch.items()}
                if "labels" not in inputs:
                    inputs["labels"] = inputs["input_ids"]

                outputs = model(**inputs)
                loss = outputs.loss
                total_eval_loss += loss.item()

        # Log the average losses
        avg_train_loss = total_train_loss / len(train_dataloader)
        avg_eval_loss = total_eval_loss / len(eval_dataloader)
        print(
            f"Epoch: {epoch}, Average Training Loss: {avg_train_loss}, Average Evaluation Loss: {avg_eval_loss}")

    # You can also include other evaluation metrics here if needed
    if accelerator.is_main_process:
        output_dir = "./accExModel"
        os.makedirs(output_dir, exist_ok=True)
        # Check if the model is wrapped with DDP and save accordingly
        if isinstance(model, torch.nn.parallel.DistributedDataParallel):
            model.module.save_pretrained(output_dir)
        else:
            model.save_pretrained(output_dir)
        print(f"Model saved to {output_dir}")


def main():
    training_function()


if __name__ == "__main__":
    start = time()
    main()
    print(f"Total Execution Time: {time() - start} seconds")

Here is the yaml config file that doesn't use FSDP

compute_environment: LOCAL_MACHINE
distributed_type: MULTI_GPU
downcast_bf16: 'no'
gpu_ids: all
machine_rank: 0
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 6
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

Here is the yaml using FSDP

compute_environment: LOCAL_MACHINE
distributed_type: FSDP
downcast_bf16: 'no'
fsdp_config:
  fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP
  fsdp_backward_prefetch_policy: BACKWARD_PRE
  fsdp_forward_prefetch: false
  fsdp_offload_params: false
  fsdp_sharding_strategy: 2
  fsdp_state_dict_type: SHARDED_STATE_DICT
  fsdp_sync_module_states: false
  fsdp_transformer_layer_cls_to_wrap: GPT2Block
  fsdp_use_orig_params: false
machine_rank: 0
main_training_function: main
mixed_precision: fp16
num_machines: 1
num_processes: 6
rdzv_backend: static
same_network: true
tpu_env: []
tpu_use_cluster: false
tpu_use_sudo: false
use_cpu: false

This is the error I seem to be getting

Traceback (most recent call last):
  File "/home/username/test/new.py", line 183, in <module>
    main()
  File "/home/username/test/new.py", line 178, in main
    training_function()
  File "/home/username/test/new.py", line 102, in training_function
    model = accelerator.prepare(model)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1202, in prepare
    result = tuple(
             ^^^^^^
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1203, in <genexpr>
    self._prepare_one(obj, first_pass=True, device_placement=d) for obj, d in zip(args, device_placement)
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1030, in _prepare_one
    return self.prepare_model(obj, device_placement=device_placement)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/accelerator.py", line 1366, in prepare_model
    model = FSDP(model, **kwargs)
            ^^^^^^^^^^^^^^^^^^^^^
TypeError: FullyShardedDataParallel.__init__() got an unexpected keyword argument 'ignored_parameters'
[2024-01-17 19:07:24,361] torch.distributed.elastic.multiprocessing.api: [ERROR] failed (exitcode: 1) local_rank: 0 (pid: 8982) of binary: /home/username/miniconda3/bin/python
Traceback (most recent call last):
  File "/home/username/miniconda3/bin/accelerate", line 11, in <module>
    sys.exit(main())
             ^^^^^^
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/commands/accelerate_cli.py", line 45, in main
    args.func(args)
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/commands/launch.py", line 966, in launch_command
    multi_gpu_launcher(args)
  File "/home/username/miniconda3/lib/python3.11/site-packages/accelerate/commands/launch.py", line 646, in multi_gpu_launcher
    distrib_run.run(args)
  File "/home/username/miniconda3/lib/python3.11/site-packages/torch/distributed/run.py", line 797, in run
    elastic_launch(
  File "/home/username/miniconda3/lib/python3.11/site-packages/torch/distributed/launcher/api.py", line 134, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
0

There are 0 answers