<a href="https://colab.research.google.com/github/sAndreotti/MedicalMeadow/blob/main/few-shot" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
MODEL = "meta-llama/Llama-3.2-1B-Instruct"

In [None]:
!pip install datasets accelerate peft bitsandbytes transformers trl==0.12.0 plotly
!pip install --upgrade smart_open
!pip install --upgrade gensim

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import torch
from transformers import (
    BitsAndBytesConfig,
)
from transformers import TrainingArguments
from trl import SFTTrainer

from datasets import load_dataset
import numpy as np
from collections import Counter
from peft import LoraConfig

## Investigate Dataset

In [None]:
ds = load_dataset("medalpaca/medical_meadow_medical_flashcards")
ds = ds['train']
ds

#### Create dataset

In [None]:
from torch.utils.data import Dataset

class MedDataset(Dataset):
  def __init__(self, instruction, input, output):
    self.instruction = instruction
    self.input = input
    self.output = output

  def __len__(self):
    return len(self.instruction)

  def __getitem__(self, idx):
    sentence = "<s>[INST] "+self.instruction[idx]+". "+self.input[idx]+" [/INST] "+self.output[idx]+" </s>"
    return sentence

In [None]:
!pip install huggingface_hub

# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM
from huggingface_hub import login

login(token="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")

#tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v0.1")
tokenizer = AutoTokenizer.from_pretrained(MODEL)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"

In [None]:
from datasets import Dataset as HFDataset

def convert_to_hf_dataset(med_dataset):
    # Create lists to store all formatted text
    formatted_texts = []

    # Iterate through all items in the original dataset
    for idx in range(len(med_dataset.instruction)):
        # Get the formatted text directly using the dataset's __getitem__
        formatted_text = med_dataset[idx]
        formatted_texts.append(formatted_text)

    # Create a dictionary with the required format
    dataset_dict = {
        'text': formatted_texts
    }

    # Convert to HuggingFace Dataset
    hf_dataset = HFDataset.from_dict(dataset_dict)

    return hf_dataset

In [None]:
def generate_response(model, tokenizer, question):
    input_text = question
    input_ids = tokenizer(input_text, return_tensors="pt").input_ids.to(model.device)
    with torch.no_grad():
        outputs = model.generate(
            inputs=input_ids,
            max_new_tokens=100,
            temperature=0.7,
            top_p=0.9,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )
    decoded_output = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return decoded_output

## 0-shot

In [None]:
compute_dtype = getattr(torch, "float16")

quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_representation="nested"
)

# Define the model
model0 = AutoModelForCausalLM.from_pretrained(
    #"TinyLlama/TinyLlama-1.1B-Chat-v0.1",
    MODEL,
    quantization_config=quant_config,
    device_map={"": 0},
    torch_dtype=torch.float16,
    trust_remote_code=True
)
model0.config.use_cache = False
model0.config.pretraining_tp = 1

## Few-shot

In [None]:
few_dataset = MedDataset(ds['instruction'][:5], ds['input'][:5], ds['output'][:5])
print(few_dataset)

In [None]:
few_hf_dataset = convert_to_hf_dataset(few_dataset)

In [None]:
compute_dtype = getattr(torch, "float16")

quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=compute_dtype,
    bnb_4bit_use_double_quant=False,
    bnb_4bit_representation="nested"
)

# Define the model
modelFEW = AutoModelForCausalLM.from_pretrained(
    #"TinyLlama/TinyLlama-1.1B-Chat-v0.1",
    MODEL,
    quantization_config=quant_config,
    device_map={"": 0},
    torch_dtype=torch.float16,
    trust_remote_code=True
)
modelFEW.config.use_cache = False
modelFEW.config.pretraining_tp = 1

training_params = TrainingArguments(
    output_dir="./results",
    num_train_epochs=1,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=4,
    optim="paged_adamw_32bit",
    save_steps=25,
    logging_steps=25,
    learning_rate=2e-4,
    weight_decay=0.001,
    fp16=False,
    bf16=False,
    max_grad_norm=0.3,
    max_steps=-1,
    warmup_ratio=0.03,
    group_by_length=True,
    lr_scheduler_type="constant",
    report_to="tensorboard",
    gradient_checkpointing=True
)

peft_params = LoraConfig(
    lora_alpha=16,
    lora_dropout=0.1,
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
)

trainer = SFTTrainer(
    model=modelFEW,
    train_dataset=few_hf_dataset,
    peft_config=peft_params,
    dataset_text_field="text",
    max_seq_length=256,
    tokenizer=tokenizer,
    args=training_params,
    packing=False,
)

In [None]:
# Few shot training
trainer.train()

In [None]:
# question = "What type of cells are found in the body of the stomach, where are they located, and what substances do they secrete?"
#question = 'What conditions are suggested by high ESR/CK and bilateral proximal muscle weakness, and high ESR/CRP with stiffness/pain in the shoulders, hip, and neck?'
question = 'What does dyspepsia refer to?'
responseFEW = generate_response(modelFEW, tokenizer, question)
print("Response FEW shot: ", responseFEW)

response0 = generate_response(model0, tokenizer, question)
print("\nResponse 0 shot: ", response0)

## Evaluate answer

scibert_scivocab_uncased is a pretrained model on scientific text

In [None]:
 from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline

# Load the tokenizer and model
#tokenizer = AutoTokenizer.from_pretrained("allenai/scibert_scivocab_uncased")
benchmark = AutoModelForQuestionAnswering.from_pretrained("allenai/scibert_scivocab_uncased")

# Create a QA pipeline
qa_pipeline = pipeline("question-answering", model=benchmark, tokenizer=tokenizer)

# Get the model's answer
result = qa_pipeline(question=question, context=responseFEW)

# Print the answer
print("Extracted Answer:", result["answer"])
print("Score:", result["score"])

second method for evaluation: BLEU (Bilingual Evaluation Understudy) measures n-gram overlap between the generated and reference answers. It's less suitable for medical QA due to its sensitivity to word order and inability to capture semantic meaning.
require couples question - answer

In [None]:
!pip install nltk

In [None]:
import nltk
nltk.download('punkt')

from nltk.translate.bleu_score import sentence_bleu

def calculate_bleu(reference, candidate):
    """
    Comput BLUE score between the candiate (generate answer) and the reference answer
    """
    try:
      score = sentence_bleu(reference, candidate)
      return score
    except Exception as e:
      print(f"Error during BLUE computing: {e}")
      return None

reference = 'Dyspepsia refers to a spectrum of epigastric symptoms, including heartburn, "indigestion," bloating, and epigastric pain/discomfort.'.split()
candidate = responseFEW.split()

bleu_score = calculate_bleu(reference, candidate)

if bleu_score is not None:
    print(f"Punteggio BLEU: {bleu_score}")