|
a |
|
b/tools/stanford/download_bigquery.py |
|
|
1 |
""" |
|
|
2 |
A tool for downloading datasets from BigQuery. |
|
|
3 |
|
|
|
4 |
Setup: |
|
|
5 |
``` |
|
|
6 |
pip install --upgrade google-cloud-bigquery |
|
|
7 |
pip install --upgrade google-cloud-storage |
|
|
8 |
conda install google-cloud-sdk -c conda-forge |
|
|
9 |
``` |
|
|
10 |
|
|
|
11 |
Note: After installing above packages, run `gcloud auth application-default login` on your terminal. |
|
|
12 |
You will be prompted with a authorization link that you will need to follow and approve using your |
|
|
13 |
email address. Then you will copy-paste authorization code to the terminal. |
|
|
14 |
|
|
|
15 |
How to run: |
|
|
16 |
``` |
|
|
17 |
python download_bigquery.py <NAME OF YOUR GCP PROJECT> <GCP BIGQUERY DATASET ID> \ |
|
|
18 |
<PATH TO LOCAL FOLDER WHERE DATASET WHERE DATASET WILL BE DOWNLOADED> |
|
|
19 |
--excluded_tables <(Optional) NAME OF TABLE 1 TO BE IGNORED> <(Optional) NAME OF TABLE 2 TO BE IGNORED> |
|
|
20 |
``` |
|
|
21 |
|
|
|
22 |
Example: python download_bigquery.py som-nero-nigam-starr \ |
|
|
23 |
som-rit-phi-starr-prod.starr_omop_cdm5_deid_1pcent_lite_2023_02_08 . |
|
|
24 |
""" |
|
|
25 |
|
|
|
26 |
from __future__ import annotations |
|
|
27 |
|
|
|
28 |
import argparse |
|
|
29 |
import hashlib |
|
|
30 |
import os |
|
|
31 |
import random |
|
|
32 |
import threading |
|
|
33 |
from functools import partial |
|
|
34 |
|
|
|
35 |
import google |
|
|
36 |
from google.cloud import bigquery, storage |
|
|
37 |
|
|
|
38 |
if __name__ == "__main__": |
|
|
39 |
parser = argparse.ArgumentParser(description="Download a Google BigQuery dataset") |
|
|
40 |
parser.add_argument( |
|
|
41 |
"gcp_project_name", |
|
|
42 |
type=str, |
|
|
43 |
help=( |
|
|
44 |
"The name of *YOUR* GCP project (e.g. 'som-nero-nigam-starr')." |
|
|
45 |
" Note that this need NOT be the GCP project that contains the dataset." |
|
|
46 |
" It just needs to be a GCP project where you have Bucket creation + BigQuery creation permissions." |
|
|
47 |
), |
|
|
48 |
) |
|
|
49 |
parser.add_argument( |
|
|
50 |
"gcp_dataset_id", |
|
|
51 |
type=str, |
|
|
52 |
help=( |
|
|
53 |
"The Dataset ID of the GCP dataset to download" |
|
|
54 |
" (e.g. 'som-rit-phi-starr-prod.starr_omop_cdm5_deid_2022_12_03')." |
|
|
55 |
" Note that this is the full ID of the dataset (project name + dataset name)" |
|
|
56 |
), |
|
|
57 |
) |
|
|
58 |
parser.add_argument( |
|
|
59 |
"output_dir", |
|
|
60 |
type=str, |
|
|
61 |
help=( |
|
|
62 |
"Path to output directory. Note: The downloaded files will be saved in a subdirectory of this," |
|
|
63 |
" i.e. `output_dir/gcp_dataset_id/...`" |
|
|
64 |
), |
|
|
65 |
) |
|
|
66 |
parser.add_argument( |
|
|
67 |
"--excluded_tables", |
|
|
68 |
type=str, |
|
|
69 |
nargs="*", # 0 or more values expected => creates a list |
|
|
70 |
default=[], |
|
|
71 |
help=( |
|
|
72 |
"Optional. Name(s) of tables to exclude. List tables separated by spaces," |
|
|
73 |
" i.e. `--excluded_tables observation note_nlp`" |
|
|
74 |
), |
|
|
75 |
) |
|
|
76 |
parser.add_argument( |
|
|
77 |
"--scratch_bucket_postfix", |
|
|
78 |
type=str, |
|
|
79 |
default="_extract_scratch", |
|
|
80 |
help="The postfix for the GCP bucket used for storing temporary files while downloading.", |
|
|
81 |
) |
|
|
82 |
args = parser.parse_args() |
|
|
83 |
|
|
|
84 |
target = f"{args.output_dir}/{args.gcp_dataset_id}" |
|
|
85 |
os.mkdir(target) |
|
|
86 |
|
|
|
87 |
print('Make sure to run "gcloud auth application-default login" before running this command') |
|
|
88 |
|
|
|
89 |
# Connect to our BigQuery project |
|
|
90 |
client = bigquery.Client(project=args.gcp_project_name) |
|
|
91 |
storage_client = storage.Client(project=args.gcp_project_name) |
|
|
92 |
|
|
|
93 |
random_dir = hashlib.md5(random.randbytes(16)).hexdigest() |
|
|
94 |
|
|
|
95 |
scratch_bucket_name = args.gcp_project_name.replace("-", "_") + args.scratch_bucket_postfix |
|
|
96 |
|
|
|
97 |
print(f"Storing temporary files in gs://{scratch_bucket_name}/{random_dir}") |
|
|
98 |
|
|
|
99 |
try: |
|
|
100 |
bucket = storage_client.get_bucket(scratch_bucket_name) |
|
|
101 |
except google.api_core.exceptions.NotFound as e: |
|
|
102 |
print(f"Could not find the requested bucket? gs://{scratch_bucket_name} in project {args.gcp_project_name}") |
|
|
103 |
raise e |
|
|
104 |
|
|
|
105 |
# Get list of all tables in this GCP dataset |
|
|
106 |
# NOTE: the `HTTPIterator` can be iterated over like a list, but only once (it's a generator) |
|
|
107 |
tables: google.api_core.page_iterator.HTTPIterator = client.list_tables(args.gcp_dataset_id) |
|
|
108 |
print(f"Downloading dataset {args.gcp_dataset_id} using your project {args.gcp_project_name}") |
|
|
109 |
|
|
|
110 |
# Use GZIP compression and export as CVSs |
|
|
111 |
extract_config = bigquery.job.ExtractJobConfig( |
|
|
112 |
compression=bigquery.job.Compression.GZIP, |
|
|
113 |
destination_format=bigquery.job.DestinationFormat.CSV, |
|
|
114 |
field_delimiter=",", |
|
|
115 |
) |
|
|
116 |
|
|
|
117 |
sem = threading.Semaphore(value=0) # needed for keeping track of how many tables have been downloaded |
|
|
118 |
|
|
|
119 |
def download(table_id: str, f): |
|
|
120 |
"""Download the results (a set of .csv.gz's) of the BigQuery extract job to our local filesystem |
|
|
121 |
Note that a single table will be extracted into possibly dozens of smaller .csv.gz files |
|
|
122 |
|
|
|
123 |
Args: |
|
|
124 |
table_id (str): Name of table (e.g. "attribute_definition") |
|
|
125 |
""" |
|
|
126 |
if f.errors is not None: |
|
|
127 |
print("Could not extract, got errors", f.errors, "for", table_id) |
|
|
128 |
os.abort() |
|
|
129 |
sem.release() |
|
|
130 |
|
|
|
131 |
n_tables: int = 0 |
|
|
132 |
for table in tables: |
|
|
133 |
# Get the full name of the table |
|
|
134 |
table_name: str = f"{table.project}.{table.dataset_id}.{table.table_id}" |
|
|
135 |
if table.table_id in args.excluded_tables: |
|
|
136 |
print(f"Skipping extraction | table = {table.table_id}") |
|
|
137 |
continue |
|
|
138 |
print(f"Extracting | table = {table.table_id}") |
|
|
139 |
# Create Google Cloud Storage bucket to extract this table into |
|
|
140 |
bucket_target_path: str = f"gs://{scratch_bucket_name}/{random_dir}/{table.table_id}/*.csv.gz" |
|
|
141 |
extract_job = client.extract_table(table.reference, bucket_target_path, job_config=extract_config) |
|
|
142 |
# Call the `download()` function asynchronously to download the bucket contents to our local filesystem |
|
|
143 |
extract_job.add_done_callback(partial(download, table.table_id)) |
|
|
144 |
n_tables += 1 |
|
|
145 |
|
|
|
146 |
print(f"\n** Extracting a total of {n_tables} tables**\n") |
|
|
147 |
for i in range(1, n_tables + 1): |
|
|
148 |
sem.acquire() |
|
|
149 |
print(f"====> Finished extracting {i} out of {n_tables} tables") |
|
|
150 |
|
|
|
151 |
print("Starting to download tables") |
|
|
152 |
|
|
|
153 |
os.system(f"gsutil -m rsync -r gs://{scratch_bucket_name}/{random_dir} {target}") |
|
|
154 |
|
|
|
155 |
print("------\n------") |
|
|
156 |
print("Successfully downloaded all tables!") |
|
|
157 |
print("------\n------") |
|
|
158 |
|
|
|
159 |
# Delete the temporary Google Cloud Storage bucket |
|
|
160 |
print("\nDeleting temporary files...") |
|
|
161 |
os.system(f"gsutil -m rm -r gs://{scratch_bucket_name}/{random_dir}") |
|
|
162 |
print("------\n------") |
|
|
163 |
print("Successfully deleted temporary Google Cloud Storage files!") |
|
|
164 |
print("------\n------") |