a b/tools/stanford/download_bigquery.py
1
"""
2
A tool for downloading datasets from BigQuery.
3
4
Setup:
5
```
6
    pip install --upgrade google-cloud-bigquery
7
    pip install --upgrade google-cloud-storage
8
    conda install google-cloud-sdk -c conda-forge
9
```
10
11
Note: After installing above packages, run `gcloud auth application-default login` on your terminal.
12
You will be prompted with a authorization link that you will need to follow and approve using your
13
email address. Then you will copy-paste authorization code to the terminal.
14
15
How to run:
16
```
17
python download_bigquery.py <NAME OF YOUR GCP PROJECT> <GCP BIGQUERY DATASET ID> \
18
    <PATH TO LOCAL FOLDER WHERE DATASET WHERE DATASET WILL BE DOWNLOADED>
19
    --excluded_tables <(Optional) NAME OF TABLE 1 TO BE IGNORED> <(Optional) NAME OF TABLE 2 TO BE IGNORED>
20
```
21
22
Example: python download_bigquery.py som-nero-nigam-starr \
23
    som-rit-phi-starr-prod.starr_omop_cdm5_deid_1pcent_lite_2023_02_08 .
24
"""
25
26
from __future__ import annotations
27
28
import argparse
29
import hashlib
30
import os
31
import random
32
import threading
33
from functools import partial
34
35
import google
36
from google.cloud import bigquery, storage
37
38
if __name__ == "__main__":
39
    parser = argparse.ArgumentParser(description="Download a Google BigQuery dataset")
40
    parser.add_argument(
41
        "gcp_project_name",
42
        type=str,
43
        help=(
44
            "The name of *YOUR* GCP project (e.g. 'som-nero-nigam-starr')."
45
            " Note that this need NOT be the GCP project that contains the dataset."
46
            " It just needs to be a GCP project where you have Bucket creation + BigQuery creation permissions."
47
        ),
48
    )
49
    parser.add_argument(
50
        "gcp_dataset_id",
51
        type=str,
52
        help=(
53
            "The Dataset ID of the GCP dataset to download"
54
            " (e.g. 'som-rit-phi-starr-prod.starr_omop_cdm5_deid_2022_12_03')."
55
            " Note that this is the full ID of the dataset (project name + dataset name)"
56
        ),
57
    )
58
    parser.add_argument(
59
        "output_dir",
60
        type=str,
61
        help=(
62
            "Path to output directory. Note: The downloaded files will be saved in a subdirectory of this,"
63
            " i.e. `output_dir/gcp_dataset_id/...`"
64
        ),
65
    )
66
    parser.add_argument(
67
        "--excluded_tables",
68
        type=str,
69
        nargs="*",  # 0 or more values expected => creates a list
70
        default=[],
71
        help=(
72
            "Optional. Name(s) of tables to exclude. List tables separated by spaces,"
73
            " i.e. `--excluded_tables observation note_nlp`"
74
        ),
75
    )
76
    parser.add_argument(
77
        "--scratch_bucket_postfix",
78
        type=str,
79
        default="_extract_scratch",
80
        help="The postfix for the GCP bucket used for storing temporary files while downloading.",
81
    )
82
    args = parser.parse_args()
83
84
    target = f"{args.output_dir}/{args.gcp_dataset_id}"
85
    os.mkdir(target)
86
87
    print('Make sure to run "gcloud auth application-default login" before running this command')
88
89
    # Connect to our BigQuery project
90
    client = bigquery.Client(project=args.gcp_project_name)
91
    storage_client = storage.Client(project=args.gcp_project_name)
92
93
    random_dir = hashlib.md5(random.randbytes(16)).hexdigest()
94
95
    scratch_bucket_name = args.gcp_project_name.replace("-", "_") + args.scratch_bucket_postfix
96
97
    print(f"Storing temporary files in gs://{scratch_bucket_name}/{random_dir}")
98
99
    try:
100
        bucket = storage_client.get_bucket(scratch_bucket_name)
101
    except google.api_core.exceptions.NotFound as e:
102
        print(f"Could not find the requested bucket? gs://{scratch_bucket_name} in project {args.gcp_project_name}")
103
        raise e
104
105
    # Get list of all tables in this GCP dataset
106
    # NOTE: the `HTTPIterator` can be iterated over like a list, but only once (it's a generator)
107
    tables: google.api_core.page_iterator.HTTPIterator = client.list_tables(args.gcp_dataset_id)
108
    print(f"Downloading dataset {args.gcp_dataset_id} using your project {args.gcp_project_name}")
109
110
    # Use GZIP compression and export as CVSs
111
    extract_config = bigquery.job.ExtractJobConfig(
112
        compression=bigquery.job.Compression.GZIP,
113
        destination_format=bigquery.job.DestinationFormat.CSV,
114
        field_delimiter=",",
115
    )
116
117
    sem = threading.Semaphore(value=0)  # needed for keeping track of how many tables have been downloaded
118
119
    def download(table_id: str, f):
120
        """Download the results (a set of .csv.gz's) of the BigQuery extract job to our local filesystem
121
        Note that a single table will be extracted into possibly dozens of smaller .csv.gz files
122
123
        Args:
124
            table_id (str): Name of table (e.g. "attribute_definition")
125
        """
126
        if f.errors is not None:
127
            print("Could not extract, got errors", f.errors, "for", table_id)
128
            os.abort()
129
        sem.release()
130
131
    n_tables: int = 0
132
    for table in tables:
133
        # Get the full name of the table
134
        table_name: str = f"{table.project}.{table.dataset_id}.{table.table_id}"
135
        if table.table_id in args.excluded_tables:
136
            print(f"Skipping extraction | table = {table.table_id}")
137
            continue
138
        print(f"Extracting | table = {table.table_id}")
139
        # Create Google Cloud Storage bucket to extract this table into
140
        bucket_target_path: str = f"gs://{scratch_bucket_name}/{random_dir}/{table.table_id}/*.csv.gz"
141
        extract_job = client.extract_table(table.reference, bucket_target_path, job_config=extract_config)
142
        # Call the `download()` function asynchronously to download the bucket contents to our local filesystem
143
        extract_job.add_done_callback(partial(download, table.table_id))
144
        n_tables += 1
145
146
    print(f"\n** Extracting a total of {n_tables} tables**\n")
147
    for i in range(1, n_tables + 1):
148
        sem.acquire()
149
        print(f"====> Finished extracting {i} out of {n_tables} tables")
150
151
    print("Starting to download tables")
152
153
    os.system(f"gsutil -m rsync -r gs://{scratch_bucket_name}/{random_dir} {target}")
154
155
    print("------\n------")
156
    print("Successfully downloaded all tables!")
157
    print("------\n------")
158
159
    # Delete the temporary Google Cloud Storage bucket
160
    print("\nDeleting temporary files...")
161
    os.system(f"gsutil -m rm -r gs://{scratch_bucket_name}/{random_dir}")
162
    print("------\n------")
163
    print("Successfully deleted temporary Google Cloud Storage files!")
164
    print("------\n------")