R und Datenbanken

sqlite, postgresql und oracle

Diego de Castillo
Oracle DBA, IT Data Management, NetCologne

Warum überhaupt Datenbanken?

  • DBs sind typenkonform; Number, Character, Date, BLOB u.v.m.
  • Indizierter Zugriff auf selektive Daten
  • kein Medienbruch
  • Unterstützt parallele zeitgleiche Zugriffe (Mehrbenutzerzugriffe)
  • Konsistenzsicherheit / Datenintegrität
  • Vereinfachtes Verknüpfen von Daten (Joins)

Beispielsdatensatz : utils und data.table

system.time({
    df <- read.csv("data/p1.csv", sep = ";")
})
##    user  system elapsed 
## 177.662   2.764 181.012
system.time({
    df2 <- fread("data/p1.csv")
})
##    user  system elapsed 
##   4.519   0.274   5.747

Beispielsdatensatz : Mengengerüst

dim(df)
## [1] 1000000      29
paste(round(file.info("data/p1.csv")$size/1024/1024), "Mb")
## [1] "306 Mb"
print(object.size(df), units = "auto")
## 408.8 Mb

Beispielsdatensatz : nicht typenkonform

str(df$revision_tag)
str(df2$revision_tag)
##  num [1:1000000] 1 1 1 1 1 1 1 1 1 1 ...
##  chr [1:1000000] "1.0" "1.0" "1.0" "1.0" "1.0" "1.0" ...
cikey process_id status root_id modify_date revision_tag
1 124640243 ExternalActivityFlow Scope_FindOrder 124640242 01.09.2013 00:01:36,228000 1.00
2 124640256 SiebelUpdateFlow Update 124640105 01.09.2013 00:02:10,070000 1.00
3 124640269 SiebelActivityFlow initiated 124640257 01.09.2013 00:20:41,144000 1.00

Also nehmen wir Datenbanken!

Mit Datenbanken sind hier relationale Datenbanksysteme gemeint. Hier betrachten wir:

  • Oracle
  • PostgreSQL
  • SQLite

Generische Pakete, die für viele Datenbank-Typen passen, sind:

  • ODBC
  • JDBC

Für einen vereinheitlichten Zugriff dient S4-Bibliothek:

  • DBI

DBI : library(DBI)

DBI ist eine generische S4-Bibliothek. Die Klassen und Methoden, die zur Verfügung stehen, können von einer Vielzahl von Datenbanksystemen genutzt werden.

Klassen:

  • DBIConnection
  • DBIDriver
  • DBIObject
  • DBIResult

Methoden:

  • dbColumnInfo
  • dbCommit
  • dbConnect
  • dbSendQuery

Der erste Datenbankzugriff

Dieser erfolgt bei einem lesenden Zugriff in 4 Schritten:

  1. Laden und Initialisieren der zugehörigen Treiber: dbDriver()
  2. Die Verbindung herstellen: dbConnect()
  3. Die Anfrage senden: dbSendQuery(con, "select * from p1")
  4. Die Daten holen: fetch(rs)

Oracle : library(ROracle)

library(ROracle)
# laden und initialisieren
drv <- dbDriver("Oracle")
# verbindung herstellen: con
con <- dbConnect(drv, username = "diego", password = "diego", dbname = "ddc11")
# anfrage senden, resultset: rs
rs <- dbSendQuery(con, "select * from p1")
# alle daten holen
system.time({
    data <- fetch(rs, n = -1)
})
##    user  system elapsed 
##  27.322   4.912  58.288

Beispielsdatensatz : aus Datenbank-Tabelle

dim(data)
str(data$REVISION_TAG)
## [1] 1000000      29
##  chr [1:1000000] "1.0" "1.0" "1.0" "1.0" "1.0" "1.0" ...
CIKEY PROCESS_ID STATUS ROOT_ID REVISION_TAG
1 124640243.00 ExternalActivityFlow Scope_FindOrder 124640242 1.0
2 124640256.00 SiebelUpdateFlow Update 124640105 1.0
3 124640269.00 SiebelActivityFlow initiated 124640257 1.0

PostgreSQL : library(RPostgreSQL)

library(RPostgreSQL)
drv <- dbDriver("PostgreSQL")
# connect DBMS mittels passendem Berechtigungsverfahren
con <- dbConnect(drv, dbname = "postgres", user = "diego", password = "diego", 
    host = "ocos")
rs <- dbSendQuery(con, "select * from P1")
system.time({
    data <- fetch(rs, n = -1)
})
##    user  system elapsed 
##  190.69   36.82  237.09

SQLite : library(RSQLite)

library(RSQLite)
drv <- dbDriver("SQLite")
con <- dbConnect(drv, "~/Google Drive/R/KoelnR-2014/data/sqlitedb.sqlite")
rs <- dbSendQuery(con, "select * from P1")
system.time({
    data <- fetch(rs, n = -1)
})
##    user  system elapsed 
##   12.41    3.07   17.42

Java Database Connectivity : library(RJDBC)

JDBC ist eine einheitliche Schnittstelle zu verschiedenen Datenbanken. Es werden für jede Datenbank die zugehörigen JDBC-Treiber benötigt.

library(DBI)
library(rJava)
library(RJDBC)
drv <- JDBC("org.sqlite.JDBC", "~/Google Drive/R/KoelnR-2014/sqlite-jdbc-3.7.2.jar")
con <- dbConnect(drv, "jdbc:sqlite:data/sqlitedb.sqlite")
rs <- dbSendQuery(con, statement = "SELECT * from P1")
data <- fetch(rs, n = 500)

Open Database Connectivity : Konfiguration

odbc.ini

[ODBC Data Sources]
postgres = postgresql

[postgres]
Driver      = PostgreSQL
ServerName   = ocos
Port = 5432
Database   = postgres
UserName     = diego
Password = diego
Protocol = 9.2

odbcinst.ini

[PostgreSQL]
Driver   = /usr/local/lib/psqlodbcw.so

Open Database Connectivity : library(RODBC)

# ohne DBI
library(RODBC)
odbcDataSources()
##     postgres 
## "PostgreSQL"
channel <- odbcConnect("postgres")
data <- sqlQuery(channel, "select * from p1 limit 500")
dim(data)
## [1] 500  29

Kürzer

Um vollständig auf SQL zu verzichten, gibt es auch Funktionen, welche die entsprechenden Statements direkt eingebunden haben:

rs <- dbSendQuery(con, "select * from P1")
data <- fetch(rs)
data <- dbReadTable(con, "P1")

Cleanup : nach dem fetch()

# löscht alle offenen Referenzen zu einem resultset
dbClearResult(rs)
# schließe connection
dbDisconnect(con)
# Treiber wird gelöscht
dbUnloadDriver(drv)

load-extract-transform / split-apply-combine

system.time({
    rb <- do.call("rbind", lapply(split(df$scope_csize, df$process_id), sum))
    rb.df <- data.frame(process_id = rownames(rb), total = rb, row.names = NULL)
    result <- head(rb.df[order(rb.df$total, decreasing = T), ], 5)
})
result
##    user  system elapsed 
##   0.041   0.028   0.106
##                   process_id     total
## 83          SiebelUpdateFlow 308411993
## 28        ExternalStatusFlow 161350929
## 80 SiebelOrderItemUpdateFlow  70485121
## 88            SyncModuleFlow  48834012
## 89             SyncOrderFlow  45176379

load-extract-transform : library(plyr)

gs <- ddply(df, "process_id", summarise, total = sum(scope_csize))
result <- head(arrange(gs, desc(total)), 5)

load-extract-transform : direkt via SQL

drv <- dbDriver("Oracle")
con <- dbConnect(drv, username = "diego", password = "diego", dbname = "ddc11")
stmt <- "select process_id, sum(scope_csize) from p1 group by process_id order by 2 desc"
rs <- dbSendQuery(con, stmt)
system.time({
    data <- fetch(rs, n = 5)
})
data
##    user  system elapsed 
##   0.002   0.001   0.910
##                  PROCESS_ID SUM(SCOPE_CSIZE)
## 1          SiebelUpdateFlow        308411993
## 2        ExternalStatusFlow        161350929
## 3 SiebelOrderItemUpdateFlow         70485121
## 4            SyncModuleFlow         48834012
## 5             SyncOrderFlow         45176379

load-extract-transform : library(dplyr)

library(dplyr)
system.time({
    processes <- group_by(df, process_id)
    gs <- summarise(processes, total = sum(scope_csize))
    result <- arrange(gs, desc(total))
})
head(result, 5)
##    user  system elapsed 
##   0.055   0.033   0.122
## Source: local data frame [5 x 2]
## 
##                  process_id     total
## 1          SiebelUpdateFlow 308411993
## 2        ExternalStatusFlow 161350929
## 3 SiebelOrderItemUpdateFlow  70485121
## 4            SyncModuleFlow  48834012
## 5             SyncOrderFlow  45176379

load-extract-transform : library(dplyr)

Wenn die Datenmenge so groß ist, dass nicht alle Daten in den Speicher passen, kann man mit Teilmengen oder Aggregationen arbeiten.

dplyr funktioniert transparent und arbeitet mit Datenbankzugriffen oder lokalen data.frames, die entsprechenden Funktionen sind die gleichen! Das generische Objekt dafür heißt tbl.

load-extract-transform : dplyr ist lazy

dplyr versucht so lazy wie möglich zu sein:

  1. Es werden Daten nur nach R geholt, wenn man explizit danach fragt.
  2. Der Datenbankzugriff wird so lange wie möglich herausgezögert und erfolgt dann in einem Schritt.

Es gibt 5 entscheidende Daten-Manipulationen in dplyr, diese werden bei Bedarf in SQL übersetzt und laufen dann auf der Datenbank:

  • select
  • filter
  • arrange
  • mutate
  • summarise

load-extract-transform : library(dplyr)

Zuerst wird ein Connect zu einer Datenbank aufgebaut. dplyr unterstützt im Moment postgres, mysql und sqlite.

my_db <- src_postgres(dbname = "postgres", user = "diego", password = "diego", 
    host = "ocos")
my_db
## src:  postgres 8.4.18 [diego@ocos:5432/postgres]
## tbls: cities, p1, test1, x_1000

dplyr-Objekt tbl aus Datenbank-Tabelle

data_db <- tbl(my_db, sql("SELECT * FROM P1"))
data_db
## Source: postgres 8.4.18 [diego@ocos:5432/postgres]
## From: <derived table> [?? x 29]
## 
##        cikey domain_ref         process_id revision_tag
## 1  124642631          0 ExternalStatusFlow          1.0
## 2  124642639          0   SiebelUpdateFlow          1.0
## 3  124642659          0   SiebelUpdateFlow          1.0
## 4  124642663          0   SiebelUpdateFlow          1.0
## 5  124642667          0      SyncOrderFlow          1.0
## 6  124642678          0   SiebelUpdateFlow          1.0
## 7  124642682          0   SiebelUpdateFlow          1.0
## 8  124642685          0   SiebelUpdateFlow          1.0
## 9  124642690          0      SyncOrderFlow          1.0
## 10 124642695          0   ODSOrderSyncFlow          1.0
## ..       ...        ...                ...          ...
## Variables not shown: creation_date (time), creator (chr), modify_date
##   (time), modifier (chr), state (int), priority (int), title (chr), status
##   (chr), stage (chr), conversation_id (chr), root_id (chr), parent_id
##   (chr), scope_revision (int), scope_csize (int), scope_usize (int),
##   process_guid (chr), process_type (int), metadata (chr), ext_string1
##   (chr), ext_string2 (chr), ext_int1 (int), test_run_id (chr), at_count_id
##   (int), at_event_id (int), at_detail_id (int)

load-extract-transform : tbl-Objekt data_db

system.time({
    processesdb <- group_by(data_db, process_id)
    gsdb <- summarise(processesdb, total = sum(scope_csize))
    result <- arrange(gsdb, desc(total))
})
##    user  system elapsed 
##   0.050   0.003   0.053
head(collect(result), 5)
## Source: local data frame [5 x 2]
## 
##                  process_id     total
## 1          SiebelUpdateFlow 308411993
## 2        ExternalStatusFlow 161350929
## 3 SiebelOrderItemUpdateFlow  70485121
## 4            SyncModuleFlow  48834012
## 5             SyncOrderFlow  45176379

Ergebnisse in Datenbank speichern : PDF als BLOB

library(ggplot2)
qp <- qplot(y = process_id, x = total, data = head(collect(result), 5))
ggsave(plot = qp, filename = "qp.pdf")
## Saving 7 x 7 in image
PICOBJ <- paste(readBin("qp.pdf", "raw", n = file.info("qp.pdf")$size), collapse = "")
qp

plot of chunk unnamed-chunk-33

Zuerst eine Connection aufbauen

library(ROracle)
drv <- dbDriver("Oracle")
con <- dbConnect(drv, username = "diego", password = "diego", dbname = "ddc11", 
    prefetch = FALSE, bulk_read = 1000L, stmt_cache = 0L)

INSERT mit dbGetQuery()

insStr <- "insert into plots values(:1, :2, utl_raw.cast_to_raw(:3))"
x <- 1
y <- as.POSIXct("2014-02-22")
z <- "Hierhin kommt das Bild"
df <- data.frame(x, y, z)
dbGetQuery(con, insStr, df)
dbCommit(con)

UPDATE via PL/SQL als RAW BLOB

updateStr <- paste0("DECLARE buf RAW(30000); BEGIN buf := '", PICOBJ, "';", 
    "UPDATE plots SET bin = buf WHERE id = 1; commit;END; ")
dbGetQuery(con, updateStr)

Grafik ist in der Datenbank gespeichert

Vielen Dank !!

Q & A