[58c332]: / archives / RadETL / R / DBMSio.R

Download this file

136 lines (119 with data), 6.0 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
################################ DBMSIO (include DatabaseConnector) Class #######################################
#' DBMSIO Class
#'
#' This class is a DBMS I/O class created using the DatabaseConnector package.
#'
#' @param server Enter the server address to access (See http://ohdsi.github.io/DatabaseConnector/reference/connect.html)
#' @param user Enter the user ID of the DBMS you want to access
#' @param pw Enter the password for the DBMS you want to access.
#' @param dbms Enter the DBMS type. ex: sql server, oracle
#' @example Examples/DBMSio_Ex.R
#' @author Neon K.I.D
#' @export
DBMSIO <- R6::R6Class(classname = "DBMSIO",
private = list(
con = NULL,
dbms = NULL,
# Using DatabaseConnector for OHDSI (included JDBC)
connectDBMS = function(server, user, pw, dbms, port) {
if(port != 0)
sql <- createConnectionDetails(dbms = dbms, user = user, password = pw, server = server, port = port)
else
sql <- createConnectionDetails(dbms = dbms, user = user, password = pw, server = server)
con <- connect(connectionDetails = sql)
return(con)
},
translateSql = function(query) {
translate(sql = query, targetDialect = private$dbms)
},
convertSql = function(query, ohdsiSchema) {
sql <- render(sql = query, ohdsiSchema = ohdsiSchema)
sql <- private$translateSql(query = sql)
return(sql)
},
setlastOccurID = function(tbS, progressBar = F) {
sql <- readSql(sourceFile = file.path(system.file('extdata/migration', package = 'RadETL', mustWork = T),
private$dbms, 'getMaxOccurID.sql'))
last_id <- querySql(connection = private$con, sql = sql)
writeLines('Refresh Occurrence ID...')
sql <- readSql(sourceFile = file.path(system.file('extdata/migration', package = 'RadETL', mustWork = T),
private$dbms, 'setNewOccurID.sql'))
rsql <- render(sql, ohdsiSchema = tbS, cur_id = last_id[1,,] + 1)
executeSql(connection = private$con, sql = rsql, progressBar = progressBar)
}
),
public = list(
initialize = function(server, user, pw, dbms, port = 0) {
# Using DatabaseConnector for OHDSI Package
if(!require(DatabaseConnector))
install.packages("DatabaseConnector")
library(DatabaseConnector)
if(!require(SqlRender))
install.packages("SqlRender")
library(SqlRender)
private$dbms <- dbms
private$con <- private$connectDBMS(server, user, pw, dbms, port)
},
# [NOTICE]
# Before call this function, create database and table,,
# using DatabaseConnector, but use connection DBMSIO object.
# Read Radiology Database table columns,,
# occur_rows: Radiology_Occurrence.rda
# img_rows: Radiology_Image.rda
insertDB = function(tbS = 'dbo', data, createTable = FALSE, tempTable = FALSE, useMppBulkLoad = FALSE, progressBar = FALSE) {
if(all(colnames(data) == occur_cols)) {
tableName <- Reduce(pasteSQL, c(tbS, 'Radiology_Occurrence'))
writeLines(text = sprintf('Execute DDL Query for %s', tableName))
if(createTable) {
osql <- readSql(sourceFile = file.path(system.file('extdata/ddl', package = 'RadETL', mustWork = T),
private$dbms, 'Radiology_Occurrence.sql'))
executeSql(connection = private$con, sql = private$convertSql(osql, ohdsiSchema = tbS))
}
} else if(all(colnames(data) == img_cols)) {
tableName <- Reduce(pasteSQL, c(tbS, 'Radiology_Image'))
writeLines(text = sprintf('Execute DDL Query for %s', tableName))
if(createTable) {
osql <- readSql(sourceFile = file.path(system.file('extdata/ddl', package = 'RadETL', mustWork = T),
private$dbms, 'Radiology_Image.sql'))
executeSql(connection = private$con, sql = private$convertSql(osql, ohdsiSchema = tbS))
}
} else stop("This data is not Radiology CDM \n Please check data and retry...")
writeLines(text = sprintf('Insert the %s into the database', tableName))
val <- paste0(apply(data, 1, function(x) paste0("('", paste0(x, collapse = "', '"), "')")), collapse = ", ")
val <- gsub(x = gsub(pattern = "NA|'\\'", replacement = "NULL", val), pattern = "\\'NULL'", replacement = "NULL", val)
sql <- paste0("INSERT INTO ", tableName, " VALUES ", val)
executeSql(private$con, sql)
private$setlastOccurID(tbS, progressBar)
},
# [WARNING]
# This function DROPs all RCDMs, including Occurrence and Image.
dropDB = function(tbS) {
writeLines(text = "[WARNING]\nThis function erases all data in the RCDM ! \nThis action can not be undone.")
ch <- readline("Would you like to continue? [y/N]: ")
switch(tolower(ch), y = {
osql <- readSql(sourceFile = file.path(system.file('extdata/ddl', package = 'RadETL', mustWork = T),
private$dbms, 'rollback', 'Drop_RCDM.sql'))
rsql <- render(sql = osql, ohdsiSchema = tbS)
writeLines(text = "Drop RCDM tables...")
executeSql(connection = private$con, sql = translate(rsql, private$dbms), progressBar = T)
})
},
# Using SQL for RDBMS...
dbGetdtS = function(dbS, tbS, condition = NULL) {
dbSchema <- c(dbS, tbS)
tb <- Reduce(pasteSQL, dbSchema)
if(is.null(condition))
sql <- paste0("SELECT * FROM ", tb)
else
sql <- paste0("SELECT * FROM ", tb, " WHERE ", condition)
return(querySql(connection = private$con, sql = private$translateSql(query = sql)))
},
executeSql = function(sql) {
executeSql(connection = private$con, sql = private$translateSql(query = sql))
},
querySql = function(sql) {
querySql(connection = private$con, sql = private$translateSql(query = sql))
},
finalize = function() disconnect(private$con)
)
)