In [3]:
# Install required packages
%pip install openai==1.12.0 azure-kusto-data langchain tenacity langchain-openai

# Import libraries
from pyspark.sql import SparkSession
from notebookutils import mssparkutils
from openai import AzureOpenAI
import pandas as pd
import json
from tenacity import retry, wait_random_exponential, stop_after_attempt

# Restart the kernel after installation if you get any import errors

StatementMeta(, 7206fc12-bc87-410f-b591-2e3863c232ba, 10, Finished, Available, Finished)

Collecting openai==1.12.0
  Downloading openai-1.12.0-py3-none-any.whl.metadata (18 kB)
Collecting azure-kusto-data
  Downloading azure_kusto_data-5.0.2-py2.py3-none-any.whl.metadata (4.2 kB)
Collecting langchain
  Downloading langchain-0.3.22-py3-none-any.whl.metadata (7.8 kB)
Collecting langchain-openai
  Downloading langchain_openai-0.3.11-py3-none-any.whl.metadata (2.3 kB)
Collecting httpx<1,>=0.23.0 (from openai==1.12.0)
  Downloading httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting pydantic<3,>=1.9.0 (from openai==1.12.0)
  Downloading pydantic-2.11.1-py3-none-any.whl.metadata (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.5/63.5 kB[0m [31m316.7 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting requests>=2.32.3 (from azure-kusto-data)
  Downloading requests-2.32.3-py3-none-any.whl.metadata (4.6 kB)
Collecting azure-identity<2,>=1.21.0 (from azure-kusto-data)
  Downloading azure_identity-1.21.0-py3-none-any.whl.metadata (81 kB)
[2K 

In [5]:
# Configuration
OPENAI_GPT4_DEPLOYMENT_NAME = "gpt-4"
OPENAI_DEPLOYMENT_ENDPOINT = mssparkutils.credentials.getSecret("openai-scope", "OPENAI_ENDPOINT")
OPENAI_API_KEY = mssparkutils.credentials.getSecret("openai-scope", "OPENAI_KEY")
OPENAI_ADA_EMBEDDING_DEPLOYMENT_NAME = "text-embedding-ada-002"

KUSTO_URI = "https://trd-zdxwqrcu1znbqygpxg.z2.kusto.fabric.microsoft.com"
KUSTO_DATABASE = "BioEventHouse"
KUSTO_TABLE = "biospecimen_embeddings"
accessToken = mssparkutils.credentials.getToken(KUSTO_URI)

client = AzureOpenAI(
    azure_endpoint=OPENAI_DEPLOYMENT_ENDPOINT,
    api_key=OPENAI_API_KEY,
    api_version="2023-09-01-preview"
)

@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def generate_embeddings(text):
    txt = text.replace("\n", " ")
    return client.embeddings.create(input=[txt], model=OPENAI_ADA_EMBEDDING_DEPLOYMENT_NAME).data[0].embedding

# Data Preparation
def prepare_data():
    # Read from Lakehouse
    df = spark.read.format("csv").option("header", "true").load("/lakehouse/default/Files/biospecimen_data.csv")
    
    # Convert to pandas for easier processing
    pdf = df.toPandas()
    
    # Create document text
    def create_document_text(row):
        text_parts = []
        if pd.notna(row['Sample Type']):
            text_parts.append(f"Sample Type: {row['Sample Type']}")
        if pd.notna(row['Primary Site']):
            text_parts.append(f"Primary Site: {row['Primary Site']}")
        # Add other fields as shown in previous example
        return ". ".join(text_parts)
    
    pdf['document_text'] = pdf.apply(create_document_text, axis=1)
    
    # Generate embeddings
    pdf['embedding'] = pdf['document_text'].apply(lambda x: generate_embeddings(x))
    
    # Prepare for Kusto
    embeddings_df = pd.DataFrame({
        'document_id': pdf['Aliquot ID'],
        'content': pdf['document_text'],
        'metadata': pdf.apply(lambda x: {
            'sample_type': x['Sample Type'],
            'primary_site': x['Primary Site'],
            'case_id': x['Case Submitter ID']
        }, axis=1),
        'embedding': pdf['embedding']
    })
    
    return spark.createDataFrame(embeddings_df)

# Store in Eventhouse
def store_embeddings(embeddings_df):
    embeddings_df.write.\
        format("com.microsoft.kusto.spark.synapse.datasource").\
        option("kustoCluster", KUSTO_URI).\
        option("kustoDatabase", KUSTO_DATABASE).\
        option("kustoTable", KUSTO_TABLE).\
        option("accessToken", accessToken).\
        mode("Append").save()

# Query System
def query_biospecimen_data(question, nr_of_answers=3):
    searchedEmbedding = generate_embeddings(question)
    
    kusto_query = f"""
    {KUSTO_TABLE} 
    | extend similarity = series_cosine_similarity(dynamic({str(searchedEmbedding)}), embedding) 
    | top {nr_of_answers} by similarity desc
    | project content, metadata, similarity
    """
    
    kustoDf = spark.read\
        .format("com.microsoft.kusto.spark.synapse.datasource")\
        .option("kustoCluster", KUSTO_URI)\
        .option("kustoDatabase", KUSTO_DATABASE)\
        .option("accessToken", accessToken)\
        .option("kustoQuery", kusto_query).load()
    
    results = [row.asDict() for row in kustoDf.collect()]
    
    # Prepare context for LLM
    context = "\n\n".join([f"Record {i+1} (Similarity: {r['similarity']:.2f}):\n{r['content']}\nMetadata: {r['metadata']}" 
                          for i, r in enumerate(results)])
    
    prompt = f"""
    You are a biomedical research assistant analyzing breast cancer biospecimen data.
    Answer the user's question based on the following records. Be precise and cite specific records when relevant.
    
    Question: {question}
    
    Relevant Biospecimen Records:
    {context}
    """
    
    response = client.chat.completions.create(
        model=OPENAI_GPT4_DEPLOYMENT_NAME,
        messages=[
            {"role": "system", "content": "You are a knowledgeable biomedical research assistant."},
            {"role": "user", "content": prompt}
        ],
        temperature=0
    )
    
    return {
        "answer": response.choices[0].message.content,
        "sources": results
    }

# Initialize the system (run once)
embeddings_df = prepare_data()
store_embeddings(embeddings_df)
print("RAG system initialized successfully!")
# Type here in the cell editor to add code!


StatementMeta(, 7206fc12-bc87-410f-b591-2e3863c232ba, 13, Finished, Available, Finished)

Py4JJavaError: An error occurred while calling z:mssparkutils.credentials.getSecret.
: com.microsoft.azure.trident.tokenlibrary.util.AkvHttpClientException: Invalid vault uri. Uri should match azure key vault URI like https://<keyVaultName>.vault.azure.net/
	at com.microsoft.azure.trident.tokenlibrary.util.AkvBasedSecretProviderClientImpl.invokeGetTarget(AkvBasedSecretProviderClient.scala:122)
	at com.microsoft.azure.trident.tokenlibrary.util.AkvBasedSecretProviderClientImpl.getAkvSecretWithAccessToken(AkvBasedSecretProviderClient.scala:153)
	at com.microsoft.azure.trident.tokenlibrary.TokenLibrary.getSecretWithToken(TokenLibrary.scala:806)
	at com.microsoft.azure.trident.tokenlibrary.TokenLibrary$.getSecretWithToken(TokenLibrary.scala:1359)
	at mssparkutils.credentials$.getSecret(credentials.scala:166)
	at mssparkutils.credentials.getSecret(credentials.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
