import dataclasses
import logging
import os
import sys
from dataclasses import dataclass, field
from typing import Callable, Dict, Optional
import numpy as np
from transformers import AutoConfig, AutoModelForSequenceClassification, AutoTokenizer, EvalPrediction
from transformers import GlueDataTrainingArguments as DataTrainingArguments
from transformers import (
HfArgumentParser,
Trainer,
TrainingArguments,
set_seed,
)
from data_processor import glue_output_modes, glue_tasks_num_labels
from utils_re import REDataset, get_eval_results
from metrics import glue_compute_metrics
logger = logging.getLogger(__name__)
@dataclass
class ModelArguments:
"""
Arguments pertaining to which model/config/tokenizer we are going to fine-tune from.
"""
model_name_or_path: str = field(
metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"}
)
config_name: Optional[str] = field(
default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"}
)
tokenizer_name: Optional[str] = field(
default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"}
)
cache_dir: Optional[str] = field(
default=None, metadata={"help": "Where do you want to store the pretrained models downloaded from s3"}
)
warmup_proportion: Optional[float] = field(
default=0.1, metadata={"help": "Proportion of training to perform linear learning rate warmup for. E.g., 0.1 = 10% of training."}
)
def main():
parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
if len(sys.argv) == 2 and sys.argv[1].endswith(".json"):
# If we pass only one argument to the script and it's the path to a json file,
# let's parse it to get our arguments.
model_args, data_args, training_args = parser.parse_json_file(json_file=os.path.abspath(sys.argv[1]))
else:
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
if (
os.path.exists(training_args.output_dir)
and os.listdir(training_args.output_dir)
and training_args.do_train
and not training_args.overwrite_output_dir
):
raise ValueError(
f"Output directory ({training_args.output_dir}) already exists and is not empty. Use --overwrite_output_dir to overcome."
)
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=logging.INFO if training_args.local_rank in [-1, 0] else logging.WARN,
)
logger.warning(
"Process rank: %s, device: %s, n_gpu: %s, distributed training: %s, 16-bits training: %s",
training_args.local_rank,
training_args.device,
training_args.n_gpu,
bool(training_args.local_rank != -1),
training_args.fp16,
)
# Set seed
set_seed(training_args.seed)
try:
num_labels = glue_tasks_num_labels[data_args.task_name]
output_mode = glue_output_modes[data_args.task_name]
except KeyError:
raise ValueError("Task not found: %s" % data_args.task_name)
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(
model_args.tokenizer_name if model_args.tokenizer_name else model_args.model_name_or_path,
cache_dir=model_args.cache_dir,
)
# Get datasets
train_dataset = (
REDataset(data_args, tokenizer=tokenizer, cache_dir=model_args.cache_dir) if training_args.do_train else None
)
eval_dataset = (
REDataset(data_args, tokenizer=tokenizer, mode="dev", cache_dir=model_args.cache_dir)
if training_args.do_eval
else None
)
test_dataset = (
REDataset(data_args, tokenizer=tokenizer, mode="test", cache_dir=model_args.cache_dir)
if training_args.do_predict
else None
)
# Load pretrained model
# Distributed training:
# The .from_pretrained methods guarantee that only one local process can concurrently
# download model & vocab.
# Currently, this code do not support distributed training.
training_args.warmup_steps = int(model_args.warmup_proportion * (len(train_dataset) / training_args.per_device_train_batch_size) * training_args.num_train_epochs)
logger.info("Training/evaluation parameters %s", training_args)
config = AutoConfig.from_pretrained(
model_args.config_name if model_args.config_name else model_args.model_name_or_path,
num_labels=num_labels,
finetuning_task=data_args.task_name,
cache_dir=model_args.cache_dir,
)
model = AutoModelForSequenceClassification.from_pretrained(
model_args.model_name_or_path,
from_tf=bool(".ckpt" in model_args.model_name_or_path),
config=config,
cache_dir=model_args.cache_dir,
)
def build_compute_metrics_fn(task_name: str) -> Callable[[EvalPrediction], Dict]:
def compute_metrics_fn(p: EvalPrediction):
preds = np.argmax(p.predictions, axis=1)
return glue_compute_metrics(preds, p.label_ids)
return compute_metrics_fn
# Initialize our Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=build_compute_metrics_fn(data_args.task_name),
)
# Training
if training_args.do_train:
trainer.train(
model_path=model_args.model_name_or_path if os.path.isdir(model_args.model_name_or_path) else None
)
trainer.save_model()
# For convenience, we also re-save the tokenizer to the same directory,
# so that you can share your model easily on huggingface.co/models =)
if trainer.is_world_master():
tokenizer.save_pretrained(training_args.output_dir)
# Evaluation
eval_results = {}
if training_args.do_eval:
logger.info("*** Evaluate ***")
# Loop to handle MNLI double evaluation (matched, mis-matched)
eval_datasets = [eval_dataset]
for eval_dataset in eval_datasets:
trainer.compute_metrics = build_compute_metrics_fn(eval_dataset.args.task_name)
eval_result = trainer.evaluate(eval_dataset=eval_dataset)
output_eval_file = os.path.join(
training_args.output_dir, f"eval_results_{eval_dataset.args.task_name}.txt"
)
if trainer.is_world_master():
with open(output_eval_file, "w") as writer:
logger.info("***** Eval results {} *****".format(eval_dataset.args.task_name))
for key, value in eval_result.items():
logger.info(" %s = %s", key, value)
writer.write("%s = %s\n" % (key, value))
eval_results.update(eval_result)
if training_args.do_predict:
logging.info("*** Test ***")
test_datasets = [test_dataset]
for test_dataset in test_datasets:
predictions = trainer.predict(test_dataset=test_dataset).predictions
if output_mode == "classification":
predictions = np.argmax(predictions, axis=1)
output_test_file = os.path.join(
training_args.output_dir,
f"test_predictions.txt"
)
if trainer.is_world_master():
with open(output_test_file, "w") as writer:
logger.info("***** Test results {} *****".format(test_dataset.args.task_name))
writer.write("index\tprediction\n")
for index, item in enumerate(predictions):
item = test_dataset.get_labels()[item]
writer.write("%d\t%s\n" % (index, item))
output_label_file = os.path.join(
data_args.data_dir,
f"test_labels.tsv"
)
output_test_result_file = os.path.join(
training_args.output_dir,
f"test_results.txt"
)
test_result = get_eval_results(output_label_file, output_test_file)
with open(output_test_result_file, "w") as writer:
for key, value in test_result.items():
logger.info(" %s = %s", key, value)
writer.write("%s = %s\n" % (key, value))
return eval_results
if __name__ == "__main__":
main()