Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
# Conflicts:
#	inst/doc/DbiAndDbplyr.pdf
  • Loading branch information
Schuemie authored and Schuemie committed Mar 16, 2023
2 parents 5176e98 + 8884db5 commit 25dd311
Show file tree
Hide file tree
Showing 89 changed files with 1,046 additions and 302 deletions.
6 changes: 3 additions & 3 deletions CRAN-SUBMISSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Version: 6.0.0
Date: 2023-01-30 10:59:45 UTC
SHA: 352a46f3b48247aaf2c39336f719015996b05b42
Version: 6.1.0
Date: 2023-03-15 13:49:01 UTC
SHA: e2945a54d4e4a305dd855ddb3e551394bddd55e6
8 changes: 4 additions & 4 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Package: DatabaseConnector
Type: Package
Title: Connecting to Various Database Platforms
Version: 6.0.0
Date: 2023-01-30
Version: 6.1.0
Date: 2023-03-15
Authors@R: c(
person("Martijn", "Schuemie", email = "[email protected]", role = c("aut", "cre")),
person("Marc", "Suchard", role = c("aut")),
Expand All @@ -20,7 +20,7 @@ Depends:
R (>= 4.0.0)
Imports:
rJava,
SqlRender (>= 1.12.0),
SqlRender (>= 1.13.0),
methods,
stringr,
readr,
Expand All @@ -31,7 +31,7 @@ Imports:
bit64,
checkmate,
digest,
dbplyr (>= 2.3.0)
dbplyr (>= 2.2.0)
Suggests:
aws.s3,
R.utils,
Expand Down
22 changes: 22 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
DatabaseConnector 6.1.0
=======================

Changes:

1. Adding support for DuckDb

Bugfixes:

1. Fixed `capacity < 0` error message when using a large Java heap space.

2. Fixed 'optional feature not supported' error when connecting to DataBricks using JDBC.

3. Fixed `insertTable()` on Snowflake when data includes `POSIXct` type.

4. Fixed 'out of Java heap space' when fetching data with (large) strings. This is achieved by checking the available Java heap space at every 10,000 rows, stopping the batch when less than half is still available. Additionally, all strings from the previous batch are de-referenced before starting a new batch.

5. Fixing DATETIME shifts on Snowflake.

6. Fixing unit tests for upcoming Andromeda version (using arrow instead of SQLite).


DatabaseConnector 6.0.0
=======================

Expand Down
18 changes: 2 additions & 16 deletions R/Andromeda.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ lowLevelQuerySqlToAndromeda.default <- function(connection,
"org.ohdsi.databaseConnector.BatchedQuery",
connection@jConnection,
query,
dbms(connection)
dbms(connection),
supportsAutoCommit(dbms(connection))
)

on.exit(rJava::.jcall(batchedQuery, "V", "clear"))
Expand Down Expand Up @@ -262,21 +263,6 @@ querySqlToAndromeda <- function(connection,
integerAsNumeric = integerAsNumeric,
integer64AsNumeric = integer64AsNumeric
)
# if (inherits(andromeda, "SQLiteConnection")) {
# columnNames <- colnames(andromeda[[andromedaTableName]])
# if (snakeCaseToCamelCase) {
# newColumnNames <- SqlRender::snakeCaseToCamelCase(columnNames)
# } else {
# newColumnNames <- toupper(columnNames)
# }
# names(andromeda[[andromedaTableName]]) <- newColumnNames
# } else {
# if (snakeCaseToCamelCase) {
# andromeda[[andromedaTableName]] <- dplyr::rename_with(andromeda[[andromedaTableName]], SqlRender::snakeCaseToCamelCase)
# } else {
# andromeda[[andromedaTableName]] <- dplyr::rename_with(andromeda[[andromedaTableName]], toupper)
# }
# }
invisible(andromeda)
},
error = function(err) {
Expand Down
47 changes: 39 additions & 8 deletions R/Connect.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ checkIfDbmsIsSupported <- function(dbms) {
"sqlite extended",
"spark",
"snowflake",
"synapse"
"synapse",
"duckdb"
)
if (!dbms %in% supportedDbmss) {
abort(sprintf(
Expand Down Expand Up @@ -279,6 +280,8 @@ connect <- function(connectionDetails = NULL,

if (connectionDetails$dbms %in% c("sqlite", "sqlite extended")) {
connectSqlite(connectionDetails)
} else if (connectionDetails$dbms == "duckdb") {
connectDuckdb(connectionDetails)
} else if (connectionDetails$dbms == "spark" && is.null(connectionDetails$connectionString())) {
connectSparkUsingOdbc(connectionDetails)
} else {
Expand Down Expand Up @@ -636,14 +639,22 @@ connectSpark <- function(connectionDetails) {
inform("Connecting using Spark JDBC driver")
jarPath <- findPathToJar("^SparkJDBC42\\.jar$", connectionDetails$pathToDriver)
driver <- getJbcDriverSingleton("com.simba.spark.jdbc.Driver", jarPath)
if (is.null(connectionDetails$connectionString()) || connectionDetails$connectionString() == "") {
connectionString <- connectionDetails$connectionString()
if (is.null(connectionString) || connectionString == "") {
abort("Error: Connection string required for connecting to Spark.")
}
if (!grepl("UseNativeQuery", connectionString)) {
if (!endsWith(connectionString, ";")) {
connectionString <- paste0(connectionString, ";")
}
connectionString <- paste0(connectionString, "UseNativeQuery=1")
}
if (is.null(connectionDetails$user())) {
connection <- connectUsingJdbcDriver(driver, connectionDetails$connectionString(), dbms = connectionDetails$dbms)
connection <- connectUsingJdbcDriver(driver, connectionString, dbms = connectionDetails$dbms)
} else {
connection <- connectUsingJdbcDriver(driver,
connectionDetails$connectionString(),
connection <- connectUsingJdbcDriver(
jdbcDriver = driver,
url = connectionString,
user = connectionDetails$user(),
password = connectionDetails$password(),
dbms = connectionDetails$dbms
Expand Down Expand Up @@ -683,13 +694,15 @@ connectSnowflake <- function(connectionDetails) {
abort("Error: Connection string required for connecting to Snowflake.")
}
if (is.null(connectionDetails$user())) {
connection <- connectUsingJdbcDriver(driver, connectionDetails$connectionString(), dbms = connectionDetails$dbms)
connection <- connectUsingJdbcDriver(driver, connectionDetails$connectionString(), dbms = connectionDetails$dbms,
"CLIENT_TIMESTAMP_TYPE_MAPPING"="TIMESTAMP_NTZ")
} else {
connection <- connectUsingJdbcDriver(driver,
connectionDetails$connectionString(),
user = connectionDetails$user(),
password = connectionDetails$password(),
dbms = connectionDetails$dbms
dbms = connectionDetails$dbms,
"CLIENT_TIMESTAMP_TYPE_MAPPING"="TIMESTAMP_NTZ"
)
}
return(connection)
Expand Down Expand Up @@ -769,6 +782,20 @@ connectUsingDbi <- function(dbiConnectionDetails) {
return(connection)
}

connectDuckdb <- function(connectionDetails) {
inform("Connecting using DuckDB driver")
ensure_installed("duckdb")
connection <- connectUsingDbi(
createDbiConnectionDetails(
dbms = connectionDetails$dbms,
drv = duckdb::duckdb(),
dbdir = connectionDetails$server(),
bigint = "integer64"
)
)
return(connection)
}

generateRandomString <- function(length = 20) {
return(paste(sample(c(letters, 0:9), length, TRUE), collapse = ""))
}
Expand Down Expand Up @@ -810,7 +837,11 @@ disconnect.default <- function(connection) {

#' @export
disconnect.DatabaseConnectorDbiConnection <- function(connection) {
DBI::dbDisconnect(connection@dbiConnection)
if (connection@dbms == "duckdb") {
DBI::dbDisconnect(connection@dbiConnection, shutdown = TRUE)
} else {
DBI::dbDisconnect(connection@dbiConnection)
}
unregisterWithRStudio(connection)
invisible(TRUE)
}
Expand Down
3 changes: 2 additions & 1 deletion R/DBI.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ setMethod(
"org.ohdsi.databaseConnector.BatchedQuery",
conn@jConnection,
statement,
dbms
dbms,
supportsAutoCommit(dbms)
)
result <- new("DatabaseConnectorJdbcResult",
content = batchedQuery,
Expand Down
2 changes: 1 addition & 1 deletion R/DbiDateFunctions.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ eoMonth <- function(date) {
#'
#' @export
dateFromParts <- function(year, month, day) {
return(as.Date(paste(year, month, day, sep='-')))
return(as.Date(paste(year, month, day, sep='-'), optional = TRUE))
}

#' Extract the year from a date
Expand Down
4 changes: 4 additions & 0 deletions R/HelperFunctions.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,7 @@ extractQueryTimesFromThread <- function(thread, logLines) {
return(result)
}

supportsAutoCommit <- function(dbms) {
return(dbms != "spark")
}

48 changes: 17 additions & 31 deletions R/InsertTable.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,6 @@ validateInt64Insert <- function() {
}
}


trySettingAutoCommit <- function(connection, value) {
tryCatch(
{
rJava::.jcall(connection@jConnection, "V", "setAutoCommit", value)
},
error = function(cond) {
# do nothing
}
)
}

#' Insert a table on the server
#'
#' @description
Expand Down Expand Up @@ -213,6 +201,7 @@ insertTable.default <- function(connection,
connection <- pool::poolCheckout(connection)
on.exit(pool::poolReturn(connection))
}
dbms <- dbms(connection)
if (!is.null(useMppBulkLoad) && useMppBulkLoad != "") {
warn("The 'useMppBulkLoad' argument is deprecated. Use 'bulkLoad' instead.",
.frequency = "regularly",
Expand All @@ -238,7 +227,7 @@ insertTable.default <- function(connection,
if (dropTableIfExists) {
createTable <- TRUE
}
if (tempTable & substr(tableName, 1, 1) != "#" & dbms(connection) != "redshift") {
if (tempTable & substr(tableName, 1, 1) != "#" & dbms != "redshift") {
tableName <- paste("#", tableName, sep = "")
}
if (!is.null(databaseSchema)) {
Expand All @@ -264,10 +253,10 @@ insertTable.default <- function(connection,
}
data <- convertLogicalFields(data)
isSqlReservedWord(c(tableName, colnames(data)), warn = TRUE)
useBulkLoad <- (bulkLoad && dbms(connection) %in% c("hive", "redshift") && createTable) ||
(bulkLoad && dbms(connection) %in% c("pdw", "postgresql") && !tempTable)
useCtasHack <- dbms(connection) %in% c("pdw", "redshift", "bigquery", "hive") && createTable && nrow(data) > 0 && !useBulkLoad
if (dbms(connection) == "bigquery" && useCtasHack && is.null(tempEmulationSchema)) {
useBulkLoad <- (bulkLoad && dbms %in% c("hive", "redshift") && createTable) ||
(bulkLoad && dbms %in% c("pdw", "postgresql") && !tempTable)
useCtasHack <- dbms %in% c("pdw", "redshift", "bigquery", "hive") && createTable && nrow(data) > 0 && !useBulkLoad
if (dbms == "bigquery" && useCtasHack && is.null(tempEmulationSchema)) {
abort("tempEmulationSchema is required to use insertTable with bigquery when inserting into a new table")
}

Expand All @@ -288,7 +277,7 @@ insertTable.default <- function(connection,
)
}

if (createTable && !useCtasHack && !(bulkLoad && dbms(connection) == "hive")) {
if (createTable && !useCtasHack && !(bulkLoad && dbms == "hive")) {
sql <- paste("CREATE TABLE ", sqlTableName, " (", sqlTableDefinition, ");", sep = "")
renderTranslateExecuteSql(
connection = connection,
Expand All @@ -305,13 +294,13 @@ insertTable.default <- function(connection,
abort("Bulk load credentials could not be confirmed. Please review them or set 'bulkLoad' to FALSE")
}
inform("Attempting to use bulk loading...")
if (dbms(connection) == "redshift") {
if (dbms == "redshift") {
bulkLoadRedshift(connection, sqlTableName, data)
} else if (dbms(connection) == "pdw") {
} else if (dbms == "pdw") {
bulkLoadPdw(connection, sqlTableName, sqlDataTypes, data)
} else if (dbms(connection) == "hive") {
} else if (dbms == "hive") {
bulkLoadHive(connection, sqlTableName, sqlFieldNames, data)
} else if (dbms(connection) == "postgresql") {
} else if (dbms == "postgresql") {
bulkLoadPostgres(connection, sqlTableName, sqlFieldNames, sqlDataTypes, data)
}
} else if (useCtasHack) {
Expand All @@ -335,25 +324,22 @@ insertTable.default <- function(connection,
")"
)
insertSql <- SqlRender::translate(insertSql,
targetDialect = dbms(connection),
targetDialect = dbms,
tempEmulationSchema = tempEmulationSchema
)
batchSize <- 10000

autoCommit <- rJava::.jcall(connection@jConnection, "Z", "getAutoCommit")
if (autoCommit) {
trySettingAutoCommit(connection, FALSE)
on.exit(trySettingAutoCommit(connection, TRUE))
}
if (nrow(data) > 0) {
if (progressBar) {
pb <- txtProgressBar(style = 3)
}
batchedInsert <- rJava::.jnew(
"org.ohdsi.databaseConnector.BatchedInsert",
connection@jConnection,
connection@dbms,
insertSql,
ncol(data)
ncol(data),
supportsAutoCommit(dbms)
)
for (start in seq(1, nrow(data), by = batchSize)) {
if (progressBar) {
Expand All @@ -373,7 +359,7 @@ insertTable.default <- function(connection,
} else if (is.numeric(column)) {
rJava::.jcall(batchedInsert, "V", "setNumeric", i, column)
} else if (is(column, "POSIXct") | is(column, "POSIXt")) {
rJava::.jcall(batchedInsert, "V", "setDateTime", i, as.character(column))
rJava::.jcall(batchedInsert, "V", "setDateTime", i, format(column, format="%Y-%m-%d %H:%M:%S"))
} else if (is(column, "Date")) {
rJava::.jcall(batchedInsert, "V", "setDate", i, as.character(column))
} else {
Expand All @@ -382,7 +368,7 @@ insertTable.default <- function(connection,
return(NULL)
}
lapply(1:ncol(data), setColumn, start = start, end = end)
if (dbms(connection) == "bigquery") {
if (dbms == "bigquery") {
if (!rJava::.jcall(batchedInsert, "Z", "executeBigQueryBatch"))
stop("Error uploading data")
} else {
Expand Down
2 changes: 2 additions & 0 deletions R/RStudio.R
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ getSchemaNames.DatabaseConnectorDbiConnection <- function(conn, catalog = NULL)
} else if (dbms(conn) == "spark") {
schemas <- DBI::dbGetQuery(conn@dbiConnection, "SHOW DATABASES")
return(schemas[, 1])
} else if (conn@dbms == "duckdb") {
return(dbGetQuery(conn, "SELECT schema_name FROM information_schema.schemata")$schema_name)
} else {
schemas <- DBI::dbGetQuery(conn@dbiConnection, "SELECT schema_name FROM information_schema.schemata;")
return(schemas[, 1])
Expand Down
Loading

0 comments on commit 25dd311

Please sign in to comment.