--- a +++ b/biobert_re/data_processor.py @@ -0,0 +1,148 @@ +import os +from enum import Enum +from typing import List, Optional, Union + +import logging +from transformers import (DataProcessor, + InputExample, + InputFeatures, + PreTrainedTokenizer) + + +logger = logging.getLogger(__name__) + + +def glue_convert_examples_to_features( + examples: List[InputExample], + tokenizer: PreTrainedTokenizer, + max_length: Optional[int] = None, + task=None, + label_list=None, + output_mode=None, +): + """ + Loads a data file into a list of ``InputFeatures`` + + Args: + examples: List of ``InputExamples`` or ``tf.data.Dataset`` containing the examples. + tokenizer: Instance of a tokenizer that will tokenize the examples + max_length: Maximum example length. Defaults to the tokenizer's max_len + task: GLUE task + label_list: List of labels. Can be obtained from the processor using the ``processor.get_labels()`` method + output_mode: String indicating the output mode, classification + + Returns: + If the ``examples`` input is a ``tf.data.Dataset``, will return a ``tf.data.Dataset`` containing the + task-specific features. If the input is a list of ``InputExamples``, will return a list of task-specific + ``InputFeatures`` which can be fed to the model. + + """ + return _glue_convert_examples_to_features( + examples, tokenizer, max_length=max_length, task=task, label_list=label_list, output_mode=output_mode + ) + + +def _glue_convert_examples_to_features( + examples: List[InputExample], + tokenizer: PreTrainedTokenizer, + max_length: Optional[int] = None, + task=None, + label_list=None, + output_mode=None, +): + if max_length is None: + max_length = tokenizer.max_len + + if task is not None: + processor = glue_processors[task]() + if label_list is None: + label_list = processor.get_labels() + logger.info("Using label list %s for task %s" % (label_list, task)) + if output_mode is None: + output_mode = glue_output_modes[task] + logger.info("Using output mode %s for task %s" % (output_mode, task)) + + label_map = {label: i for i, label in enumerate(label_list)} + + def label_from_example(example: InputExample) -> Union[int, float, None]: + if example.label is None: + return None + return label_map[example.label] + + labels = [label_from_example(example) for example in examples] + + batch_encoding = tokenizer( + [[example.text_a, example.text_b] for example in examples], + max_length=max_length, + padding="max_length", + truncation=True, + ) + + features = [] + for i in range(len(examples)): + inputs = {k: batch_encoding[k][i] for k in batch_encoding} + + feature = InputFeatures(**inputs, label=labels[i]) + features.append(feature) + + for i, example in enumerate(examples[:5]): + logger.info("*** Example ***") + logger.info("guid: %s" % example.guid) + logger.info("features: %s" % features[i]) + + return features + + +class OutputMode(Enum): + classification = "classification" + +class EHRProcessor(DataProcessor): + """Processor for EHR data.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_example_from_tensor_dict(self, tensor_dict): + """See base class.""" + return InputExample( + tensor_dict["idx"].numpy(), + tensor_dict["sentence"].numpy().decode("utf-8"), + None, + str(tensor_dict["label"].numpy()), + ) + + def get_train_examples(self, data_dir): + """See base class.""" + return self._create_examples(self._read_tsv(os.path.join(data_dir, "train.tsv")), "train") + + def get_dev_examples(self, data_dir): + """See base class.""" + return self._create_examples(self._read_tsv(os.path.join(data_dir, "dev.tsv")), "dev") + + def get_test_examples(self, data_dir): + """See base class.""" + return self._create_examples(self._read_tsv(os.path.join(data_dir, "test.tsv")), "test") + + def get_labels(self): + """See base class.""" + return ["0", "1"] + + def _create_examples(self, lines, set_type): + """Creates examples for the training, dev and test sets.""" + examples = [] + text_index = 1 if set_type == "test" else 0 + for (i, line) in enumerate(lines): + if i == 0: + continue + guid = "%s-%s" % (set_type, i) + text_a = line[text_index] + label = None if set_type == "test" else line[1] + examples.append(InputExample(guid=guid, text_a=text_a, text_b=None, label=label)) + return examples + + +glue_tasks_num_labels = {"ehr-re": 2} + +glue_processors = {"ehr-re": EHRProcessor} + +glue_output_modes = {"ehr-re": "classification"}