Shixiang Wang

>上士闻道
勤而行之

R-操作数据库

王诗翔 · 2018-08-19

分类: r  
标签: r   SQL   SQLite   MySQL   MongoDB   Redis  

在阅读之前我有必要进行申明,因为使用的是RMD文档,所以每次RENDER的时候都会读取数据库,所以结果显示不一样正常,有的也加了overwriteappend选项。显示的结果不一定与实际读者操作的结果一致。

内容:

操作关系型数据库

关系型数据库就是一个由表和表之间的关系组成的集合。数据库中的表和R中的数据框有相同地形式。表之间可以互相关联,让我们能够轻松合并多张表信息。

下面从最简单数据库开始,SQLite(http://sqlite.org/是一个轻量级的数据库引擎。

在R中操作SQLite数据库需要用RSQLite扩展包。运行下面代码:

if(!require("RSQLite")) install.packages("RSQLite")
#> Loading required package: RSQLite

创建SQLite数据库

现在创建一个 db 目录,用来存储数据库:

if (!dir.exists("../../../static/datasets/db/")) dir.create("../../../static/datasets/db/")

接下来载入RSQLite包,提供数据库驱动SQLite()和数据库文件example.sqlite建立连接。如果目标文件不存在,数据库驱动会创建一个新的空文件,即空SQLite数据库:

library(RSQLite)
con = dbConnect(SQLite(), "../../../static/datasets/db/example.sqlite")

数据库连接con是介于用户和系统中间的一层。我们可以创建一个连接,并且连接到关系型数据库,并通过这个连接实现查询、抽取以及更新数据。后续将一直使用这个连接,直到这个连接被关闭。

我们可以在关系数据库中创建新表,表看起来和R里的数据框是一样的。

下面创建一个数据框,并将它作为表插入数据库中。

example1 = data.frame(
    id = 1:5,
    type = c("A", "A", "B", "B", "C"),
    score = c(8, 9, 8, 10, 9),
    stringsAsFactors = FALSE
)
example1
#>   id type score
#> 1  1    A     8
#> 2  2    A     9
#> 3  3    B     8
#> 4  4    B    10
#> 5  5    C     9

现在进行写入:

dbWriteTable(con, "example1", example1, overwrite = TRUE)

接下来使用dbDisconnect()断开数据库连接,这样con便不可用了。

dbDisconnect(con)

向一个数据库写入多张表格

现在我们将ggplot2diamonds数据集和nycflights13中的flights数据集作为两张表格写入数据库中。

if(!require("ggplot2")) install.packages("ggplot2")
#> Loading required package: ggplot2
if(!require("nycflights13")) install.packages("nycflights13")
#> Loading required package: nycflights13
data("diamonds", package = "ggplot2")
data("flights", package = "nycflights13")

我们重复之前的操作流程:

con = dbConnect(SQLite(), "../../../static/datasets/db/datasets.sqlite")
dbWriteTable(con, "diamonds", diamonds, row.names = FALSE, overwrite = TRUE)
dbWriteTable(con, "flights", flights, row.names = FALSE, overwrite = TRUE)
dbDisconnect(con)

这里使用overwrite=TRUE的目的是防止多次添加同一张表报错。

现在数据库中有两张表了。

向表中追加数据

下面先生成几个数据块,然后增加到数据库的表中:

con = dbConnect(SQLite(), "../../../static/datasets/db/example2.sqlite")
# 先移除之前的表
# dbRemoveTable(con, "products")
chunk_size = 10
id = 0
for (i in 1:6){
    chunk = data.frame(id = ((i - 1L) * chunk_size):(i * chunk_size -1L),
                       type = LETTERS[[i]],
                       score = rbinom(chunk_size, 10, (10-i)/10),
                       stringsAsFactors = FALSE)
    dbWriteTable(con, "products", chunk,
                 append = i > 1, row.names = FALSE)
}
dbDisconnect(con)

每次代码块都生成一个数据框,包含一些确定数据和随机数,我们将这些数据记录追加到一个名为products的表中。

访问表和表中字段

当有了数据库和数据,我们可以进行数据访问。比如所有表的名字或某个表的列。

con = dbConnect(SQLite(), "../../../static/datasets/db/datasets.sqlite")

使用dbExistsTable()可以检查数据库是否存在某张表:

dbExistsTable(con, "diamonds")
#> [1] TRUE
dbExistsTable(con, "mtcars")
#> [1] FALSE

下面列出该数据库的所有表:

dbListTables(con)
#> [1] "diamonds" "flights"

对于某一张表,我们可以使用dbListFields()列出表的列名(或字段):

dbListFields(con, "diamonds")
#>  [1] "carat"   "cut"     "color"   "clarity" "depth"   "table"   "price"  
#>  [8] "x"       "y"       "z"

dbWriteTable()相反,adReadTable()将表格读入为数据框:

db.diamonds = dbReadTable(con, "diamonds")
dbDisconnect(con)
head(db.diamonds)
#>   carat       cut color clarity depth table price    x    y    z
#> 1  0.23     Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
#> 2  0.21   Premium     E     SI1  59.8    61   326 3.89 3.84 2.31
#> 3  0.23      Good     E     VS1  56.9    65   327 4.05 4.07 2.31
#> 4  0.29   Premium     I     VS2  62.4    58   334 4.20 4.23 2.63
#> 5  0.31      Good     J     SI2  63.3    58   335 4.34 4.35 2.75
#> 6  0.24 Very Good     J    VVS2  62.8    57   336 3.94 3.96 2.48
str(db.diamonds)
#> 'data.frame':    53940 obs. of  10 variables:
#>  $ carat  : num  0.23 0.21 0.23 0.29 0.31 0.24 0.24 0.26 0.22 0.23 ...
#>  $ cut    : chr  "Ideal" "Premium" "Good" "Premium" ...
#>  $ color  : chr  "E" "E" "E" "I" ...
#>  $ clarity: chr  "SI2" "SI1" "VS1" "VS2" ...
#>  $ depth  : num  61.5 59.8 56.9 62.4 63.3 62.8 62.3 61.9 65.1 59.4 ...
#>  $ table  : num  55 61 65 58 58 57 57 55 61 61 ...
#>  $ price  : int  326 326 327 334 335 336 336 337 337 338 ...
#>  $ x      : num  3.95 3.89 4.05 4.2 4.34 3.94 3.95 4.07 3.87 4 ...
#>  $ y      : num  3.98 3.84 4.07 4.23 4.35 3.96 3.98 4.11 3.78 4.05 ...
#>  $ z      : num  2.43 2.31 2.31 2.63 2.75 2.48 2.47 2.53 2.49 2.39 ...

用SQL对关系数据库进行查询

这部分我们学习如何根据需求对数据库进行查询,进而从中获取数据。SQL是重点,中文叫结构化查询语句。

首先与数据库建立连接:

con = dbConnect(SQLite(), "../../../static/datasets/db/datasets.sqlite")
dbListTables(con)
#> [1] "diamonds" "flights"

使用dbGetQuery()我们可以将SQL查询语句作为参数输入,select语句可以帮助我们选取数据:

db_diamonds = dbGetQuery(con, 
                         "select * from diamonds")
head(db_diamonds)
#>   carat       cut color clarity depth table price    x    y    z
#> 1  0.23     Ideal     E     SI2  61.5    55   326 3.95 3.98 2.43
#> 2  0.21   Premium     E     SI1  59.8    61   326 3.89 3.84 2.31
#> 3  0.23      Good     E     VS1  56.9    65   327 4.05 4.07 2.31
#> 4  0.29   Premium     I     VS2  62.4    58   334 4.20 4.23 2.63
#> 5  0.31      Good     J     SI2  63.3    58   335 4.34 4.35 2.75
#> 6  0.24 Very Good     J    VVS2  62.8    57   336 3.94 3.96 2.48

这里*代表所有的字段,如果仅需要字段的一个子集,可以依次列出列名:

db_diamonds = dbGetQuery(con,
                         "select carat, cut, color, clarity, 
                         depth, price
                         from diamonds")
head(db_diamonds)
#>   carat       cut color clarity depth price
#> 1  0.23     Ideal     E     SI2  61.5   326
#> 2  0.21   Premium     E     SI1  59.8   326
#> 3  0.23      Good     E     VS1  56.9   327
#> 4  0.29   Premium     I     VS2  62.4   334
#> 5  0.31      Good     J     SI2  63.3   335
#> 6  0.24 Very Good     J    VVS2  62.8   336

**如果想要选取数据中所有不重复的值,使用select distinct。例如:

dbGetQuery(con, "select distinct cut from diamonds")
#>         cut
#> 1     Ideal
#> 2   Premium
#> 3      Good
#> 4 Very Good
#> 5      Fair

注意,dbGetQuery()总是返回一个数据框

当使用select选择列查询时,原表列名可能不合意,使用A as B可以将A列名修改为B

db_diamonds = dbGetQuery(con,
                         "select carat, price, clarity as clarity_level from diamonds")
head(db_diamonds)
#>   carat price clarity_level
#> 1  0.23   326           SI2
#> 2  0.21   326           SI1
#> 3  0.23   327           VS1
#> 4  0.29   334           VS2
#> 5  0.31   335           SI2
#> 6  0.24   336          VVS2

有时候我们需要对查询的值进行计算得到需要的列,也可以使用A as B的形式:

db_diamonds = dbGetQuery(con,
                         "select carat, price, x * y * z as size
                         from diamonds")
head(db_diamonds, 3)
#>   carat price size
#> 1  0.23   326 38.2
#> 2  0.21   326 34.5
#> 3  0.23   327 38.1

如果要用现有的列生成一个新列,再用新列生成另一个列,该怎么办?

db_diamonds = dbGetQuery(con,
                         "select carat, price, x * y * z as size,
                         price / size as value_density
                         from diamonds")
#> Error: no such column: size

上面的做法是错误的,A as B形式中A必须由已存在的列构成。我们可以通过内嵌的select语句产生一个临时表解决这样的需求:

db_diamonds = dbGetQuery(con, 
                         "select *, price / size as value_density from 
                         (select carat, price, x * y * z as size
                         from diamonds)")
head(db_diamonds, 3)
#>   carat price size value_density
#> 1  0.23   326 38.2          8.53
#> 2  0.21   326 34.5          9.45
#> 3  0.23   327 38.1          8.59

数据库查询的另一个重要部分是条件查询,我们用where指明查询结果应满足的条件。

例如,选择cut值为Good的钻石数据:

good_diamonds = dbGetQuery(con,
                           "select carat, cut, price from diamonds
                           where cut = 'Good'")
head(good_diamonds,3)
#>   carat  cut price
#> 1  0.23 Good   327
#> 2  0.31 Good   335
#> 3  0.30 Good   339

如果查询需要同时满足多个条件,使用and进行连结

good_e_diamonds = dbGetQuery(con, 
                             "select carat, cut, color, price from diamonds
                             where cut = 'Good' and color = 'E'")
head(good_e_diamonds, 3)
#>   carat  cut color price
#> 1  0.23 Good     E   327
#> 2  0.23 Good     E   402
#> 3  0.26 Good     E   554
nrow(good_e_diamonds) / nrow(diamonds)
#> [1] 0.0173

我们还可以通过in检查字段值是否包含在给定集合中:

color_ef_diamonds = dbGetQuery(con,
                               "select carat, cut, color, price from diamonds 
                               where color in ('E', 'F')")
nrow(color_ef_diamonds) 
#> [1] 19339

使用in语句时指定一个集合,使用between ... and指定一个区间,这个区间不一定是数值型的,只要数据类型可比即可。

some_price_diamonds = dbGetQuery(con, 
                                 "select carat, cut, color, price from diamonds
                                 where price between 5000 and 5500")
nrow(some_price_diamonds)
#> [1] 1772

还有个有用的运算符like可用于字符串数据,它可以筛选具有某个模式的字段。

例如选择cut变量以Good结尾的记录,它可以是GoodVeryGood,我们用like '%Good',这里%可匹配任意字符串。

good_cut_diamonds = dbGetQuery(con,
                               "select carat, cut, color, price from diamonds 
                               where cut like '%Good'")
nrow(good_cut_diamonds)
#> [1] 16988

数据库查询还有一个重要功能,即按照指定字段重新排列数据,可以用order by实现。

例如,以price字段升序排列:

cheapest_diamonds = dbGetQuery(con,
                               "select carat, price from diamonds
                               order by price")
head(cheapest_diamonds)
#>   carat price
#> 1  0.23   326
#> 2  0.21   326
#> 3  0.23   327
#> 4  0.29   334
#> 5  0.31   335
#> 6  0.24   336

在排序字段时加desc就可以降序排列,因此可以得到一个与上面相反的数据框:

most_expensive_diamonds = dbGetQuery(con,
                                     "select carat, price from diamonds
                                     order by price desc")
head(most_expensive_diamonds)
#>   carat price
#> 1  2.29 18823
#> 2  2.00 18818
#> 3  1.51 18806
#> 4  2.07 18804
#> 5  2.00 18803
#> 6  2.29 18797

同样的道理我们可以根据多个字段进行排序:

cheapest_diamonds = dbGetQuery(con,
                               "select carat, price from diamonds 
                               order by price, carat desc")
head(cheapest_diamonds)
#>   carat price
#> 1  0.23   326
#> 2  0.21   326
#> 3  0.23   327
#> 4  0.29   334
#> 5  0.31   335
#> 6  0.24   336

用于排序的列可以是根据已有列计算生成的:

dense_diamonds = dbGetQuery(con,
                            "select carat, price, x*y*z as size from diamonds
                            order by carat / size desc")
head(dense_diamonds)
#>   carat price  size
#> 1  1.07  5909  47.2
#> 2  1.41  9752  74.4
#> 3  1.53  8971  85.3
#> 4  1.51  7188 133.1
#> 5  1.22  3156 108.2
#> 6  1.12  6115 101.0

whereorder by可以连用得到排序的子集:

head(dbGetQuery(con,
                "select carat, price from diamonds
                where cut = 'Ideal' and clarity = 'IF' and color = 'J'
                order by price"))
#>   carat price
#> 1  0.30   489
#> 2  0.30   489
#> 3  0.32   521
#> 4  0.32   533
#> 5  0.32   533
#> 6  0.35   569

如果只关心前几行,可以用limit限制记录条数“

dbGetQuery(con,
           "select carat, price from diamonds
           order by carat desc limit 3")
#>   carat price
#> 1  5.01 18018
#> 2  4.50 18531
#> 3  4.13 17329

除了字段选择、条件筛选和排序,我们还可以在数据库中对记录进行分组聚合。

例如计算每种颜色的记录条数:

dbGetQuery(con,
           "select color, count(*) as number from diamonds
           group by color")
#>   color number
#> 1     D   6775
#> 2     E   9797
#> 3     F   9542
#> 4     G  11292
#> 5     H   8304
#> 6     I   5422
#> 7     J   2808

除了汇总计数,还有avg(),max(),min()sum()等聚合函数。

比如计算钻石不同透明度的平均价格:

dbGetQuery(con,
           "select clarity, avg(price) as avg_price 
           from diamonds
           group by clarity
           order by avg_price desc")
#>   clarity avg_price
#> 1     SI2      5063
#> 2     SI1      3996
#> 3     VS2      3925
#> 4      I1      3924
#> 5     VS1      3839
#> 6    VVS2      3284
#> 7      IF      2865
#> 8    VVS1      2523

在最低5个价格水平下,能买到最大的克拉数是多少?

dbGetQuery(con,
           "select price, max(carat) as max_carat
           from diamonds
           group by price
           order by price limit 5")
#>   price max_carat
#> 1   326      0.23
#> 2   327      0.23
#> 3   334      0.29
#> 4   335      0.31
#> 5   336      0.24

可以在组内进行多个运算。下面代码计算每个透明度下的价格区间和价格平均值:

dbGetQuery(con,
           "select clarity,
           min(price) as min_price,
           max(price) as max_price,
           avg(price) as avg_price
           
           from diamonds
           group by clarity
           order by avg_price desc")
#>   clarity min_price max_price avg_price
#> 1     SI2       326     18804      5063
#> 2     SI1       326     18818      3996
#> 3     VS2       334     18823      3925
#> 4      I1       345     18531      3924
#> 5     VS1       327     18795      3839
#> 6    VVS2       336     18768      3284
#> 7      IF       369     18806      2865
#> 8    VVS1       336     18777      2523

下面用重量进行加权,计算不同透明度水平下每克拉钻石的平均价格:

dbGetQuery(con, 
           "select clarity,
           sum(price * carat) / sum(carat) as wprice
           from diamonds
           group by clarity
           order by wprice desc")
#>   clarity wprice
#> 1     SI2   7012
#> 2     VS2   6174
#> 3     VS1   6060
#> 4     SI1   5919
#> 5    VVS2   5470
#> 6      I1   5234
#> 7      IF   5125
#> 8    VVS1   4389

关系型数据中,最能体现关系概念的运算是表的连接(join),即将若干表通过某些字段连接起来。

举例,创建一个新的数据框,包含字段cut,colorclarity共3条记录,之后我们根据这3条记录筛选数据:

diamond_selector = data.frame(
    cut = c("Ideal", "Good", "Fair"),
    color = c("E", "I", "D"),
    clarity = c("VS1", "T1", "IF"),
    stringsAsFactors = FALSE
)
diamond_selector
#>     cut color clarity
#> 1 Ideal     E     VS1
#> 2  Good     I      T1
#> 3  Fair     D      IF

现在写入数据库,然后连接diamonds表和diamond_selector表,选择合适的数据:

dbWriteTable(con, "diamond_selector", diamond_selector,
             row.names = FALSE, overwrite = TRUE)

通过连接子句join-clause声明要匹配的列:

subset_diamonds = dbGetQuery(con, 
                             "select cut, color, clarity, carat, price
                             from diamonds
                             join diamond_selector using (cut, color, clarity)")
head(subset_diamonds)
#>     cut color clarity carat price
#> 1 Ideal     E     VS1  0.60  2774
#> 2 Ideal     E     VS1  0.26   556
#> 3 Ideal     E     VS1  0.70  2818
#> 4 Ideal     E     VS1  0.70  2837
#> 5 Ideal     E     VS1  0.26   556
#> 6 Ideal     E     VS1  0.26   556

最后不要忘记断开数据库连接,以确保所有资源被正确释放:

dbDisconnect(con)

更多SQL用法,访问http://www/w3school.com/sql

分块提取查询结果

通常,我们只提取数据库的一个子集进行研究。然后有时候我们需要检查的数据量还是超过了计算机内存容量。因此必须逐块处理。

接下来我们使用dbSendQuery()进行查询,而不是dbGetQuery(),然后我们重复地从查询结果中取回一块数据,直到取回所有的查询结果。

con = dbConnect(SQLite(), "../../../static/datasets/db/datasets.sqlite")
res = dbSendQuery(con,
                  "select carat, cut, color, price from diamonds
                  where cut = 'Ideal' and color = 'E' ") 
while(!dbHasCompleted(res)) {
    chunk = dbFetch(res, 800)
    cat(nrow(chunk), "records fetched\n")
}
#> 800 records fetched
#> 800 records fetched
#> 800 records fetched
#> 800 records fetched
#> 703 records fetched

清理结果并关闭连接:

dbClearResult(res)
dbDisconnect(con)

当实际处理的数据过大时,这样处理不失为一种好的办法。

出于一致性考虑的事务操作

当我们插入或更新数据时,是通过事务实现的。其中事务是对数据库操作的逻辑单位,事务操作有两种:提交(将数据库所做的修改永久写入数据库)和回滚(将数据库所做的修改全部撤销,数据库还原到操作前的状态)

如果一个事务操作失败了,我们可以撤销并回滚,以保证数据的一致性。

下面用一个例子来简单模拟一次数据的累积和出错过程。

set.seed(123)
con = dbConnect(SQLite(), "../../../static/datasets/db/products.sqlite")
chunk_size = 10
for (i in 1:6){
    cat("Processing chunk", i, "\n")
    if(runif(1) <= 0.2) stop("Data error")
    
    chunk = data.frame(id = ((i - 1L) * chunk_size) : (i * chunk_size - 1L),
                        type = LETTERS[[i]],
                        score = rbinom(chunk_size, 10, (10 -i ) /10),
                       stringsAsFactors = FALSE)
    dbWriteTable(con, "products", chunk, append = i > 1, row.names = FALSE)
}
#> Processing chunk 1 
#> Processing chunk 2 
#> Processing chunk 3 
#> Processing chunk 4 
#> Processing chunk 5
#> Error in eval(expr, envir, enclos): Data error

该过程在处理第5个数据处理时出错。我们计算一下表中的记录数:

dbGetQuery(con, "select COUNT(*) from products")
#>   COUNT(*)
#> 1       40
dbDisconnect(con)

这个时候很尴尬——存储了一部分正确的信息,但有没有完全对。一般这时我们希望只有两种结果:要么正确存储所有数据,要么不存入任何数据,这两个结果都考研保证数据库的一致性

为了确保对数据库的一系列变更能够作为一个整体,我们在写入任何数据前都调用dbBegin(),待所有变更完成后,再调用dbCommit(),如果这个过程出现了错误,就调用dbRollback()

接下来的代码是上个例子的增强版,实现了上一段话所说的流程。请读者务必谨记这一思想与过程。

set.seed(123)
file.remove("../../../static/datasets/db/products.sqlite")
#> [1] TRUE
con = dbConnect(SQLite(), "../../../static/datasets/db/products.sqlite")
chunk_size = 10
dbBegin(con)
fes = tryCatch({
    for (i in 1:6){
        cat("Processing chunk", i, "\n")
        if(runif(1) <= 0.2) stop("Data error")
        chunk = data.frame(id = ((i - 1L) * chunk_size) : (i * chunk_size - 1L),
                        type = LETTERS[[i]],
                        score = rbinom(chunk_size, 10, (10 -i ) /10),
                       stringsAsFactors = FALSE)
        dbWriteTable(con, "products", chunk, append = i > 1, row.names = FALSE)
        }
    dbCommit(con)
}, error = function(e){
        warning("An error occurs: ", e, "\nRolling back", immediate. = TRUE)
        dbRollback(con)
    }
)
#> Processing chunk 1 
#> Processing chunk 2 
#> Processing chunk 3 
#> Processing chunk 4 
#> Processing chunk 5
#> Warning in value[[3L]](cond): An error occurs: Error in doTryCatch(return(expr), name, parentenv, handler): Data error
#> 
#> Rolling back

我们可以发现成功捕捉到了错误,此时再验证一下,数据库应当没有products表:

dbGetQuery(con, "select COUNT(*) from products")
#> Error: no such table: products
dbDisconnect(con)

要求数据之间具有一致性的另一个例子是账户转移。当我们将一笔资金从一个账户转移到另一个账户时,必须确保系统从一个账户提取资金,同时向另一账户存入等额资金。这两个变动要么同时发生,要么同时都失败,以保证一致性。对于这样的问题,利用关系型数据库的事务操作可以轻松实现。

下面创建一个函数设定一个虚拟银行的SQLite数据库,调用dbSendQuery()发送命令,创建accounts表(账户)和transactions表(交易)。

create_bank = function(dbfile){
    if(file.exists(dbfile)) file.remove(dbfile)
    con = dbConnect(SQLite(), dbfile)
    dbSendQuery(con,
                "create table accounts
                (name text primarykey key, balance real)")
    dbSendQuery(con,
                "create table transactions
                (time text, account_from text, account_to text, value real)")
    con
}

accounts表具有2列:namebalancetransaction表有4列:timeaccount_fromaccount_tovalue

第1张表格存储所有的账户信息,第2张存储所有的历史交易信息。

另外我们再创建一个函数用于设定带账户名和初始余额的账户,它用insert intoaccounts表写入新记录:

create_account = function(con, name, balance){
    dbSendQuery(con,
                sprintf("insert into accounts (name, balance) values ('%s', %.2f)", name, balance))
    TRUE
}

这里我们使用sprintf()产生之前的SQL语句。

接着我们写一个转账函数,用于检查数据库中是否同时存在取帐用户和收账用户,确保取帐用户的余额足够完成转账请求。一旦转账有效,它会更新两个账户的余额,并向数据库中添加一条交易记录:

transfer = function(con, from, to, value){
    get_account = function(name){
        account = dbGetQuery(con,
                             sprintf("select * from accounts
                                     where name = '%s' ", name))
        if(nrow(account) == 0){
            stop(sprintf("Account '%s' does not exist", name))
        }
        account
    }
    
    account_from = get_account(from)
    account_to = get_account(to)
    
    if (account_from$balance < value) {
        stop(sprintf("Insufficient money to transfer from '%s'", from))
    }else{
        dbSendQuery(con,
            sprintf("update accounts set balance = %.2f
                    where name = '%s' ",
                    account_from$balance - value, from))
        dbSendQuery(con,
                    sprintf("update accounts set balance = %.2f
                            where name = '%s' ",
                            account_to$balance + value, to))
        dbSendQuery(con,
                    sprintf("insert into transactions (time, account_from, 
                            account_to, value)
                            values ('%s', '%s', '%s', %.2f)", 
                    format(Sys.time(), "%Y-%m-%d %H:%M:%S"), from, to, value))
    }
    TRUE
}

尽管我们已经考虑到了一致性的问题也在函数中做了检查,但仍然有其他可能的风险,因此我们实现transfer()的一种安全版本,利用事务操作确保只要转账出现任何错误,就撤销transfer()的一切更改。

safe_transfer = function(con, ...){
    dbBegin(con)
    tryCatch({
        transfer(con, ...)
        dbCommit(con)
    }, error = function(e){
        message("An error occurs in the transcation. Rollback...")
        dbRollback(con)
        stop(e)
    })
}

这里safe_transfer()transfer()的一个封装,它将修改放入了安全的沙箱中以确保数据库的一致性。

在对以上函数进行运行测试前,我们还需要函数来查看给定账户的余额和成功完成的交易信息:

get_balance = function(con, name){
    res = dbGetQuery(con,
                     sprintf("select balance from accounts
                             where name = '%s'", name))
    res$balance
}
get_transactions = function(con, from, to){
    dbGetQuery(con,
               sprintf("select * from transactions
                       where account_from = '%s' and account_to = '%s'",
                       from, to))
}

下面进行测试

首先创建一个虚拟银行,然后创建两个用户并赋予初始余额。

con = create_bank("../../../static/datasets/db/bank.sqlite")
#> Warning: Closing open result set, pending rows
create_account(con, "周丹", 5000000000)
#> Warning: Closing open result set, pending rows
#> [1] TRUE
create_account(con, "诗翔", 30000000000)
#> Warning: Closing open result set, pending rows
#> [1] TRUE

现在查看下我和妹子的虚拟币!

get_balance(con, "周丹")
#> Warning: Closing open result set, pending rows
#> [1] 5e+09
get_balance(con, "诗翔")
#> [1] 3e+10

真有钱,哇咔咔。给妹纸转账:

safe_transfer(con, "诗翔", "周丹", 1000000000)
#> Warning: Closing open result set, pending rows

#> Warning: Closing open result set, pending rows

#> Warning: Closing open result set, pending rows
get_balance(con, "周丹")
#> [1] 6e+09
get_balance(con, "诗翔")
#> [1] 2.9e+10

获取下交易记录:

get_transactions(con, "诗翔", "周丹")
#>                  time account_from account_to value
#> 1 2020-08-09 13:32:54         诗翔       周丹 1e+09

关闭数据库:

dbDisconnect(con)

将多个文件数据存入一个数据库

处理大数据问题常遇到两类问题:一是文本格式数据源非常大,难以载入内存;二是数据分散在许多文件中,需要费力气整合到数据框中。

对于第一类问题的方案在前面已经演示过,我们可以逐块地读取数据,并将每块数据追加到数据库的某张表中。下面提供的函数便是为了这个目的设计的,给定输入文件、输出数据库、表名和数据块的容量,该函数向数据库的表中追加记录,只需要很小的工作内存。

chunk_rw = function(input, output, table, chunk_size = 10000){
    first_row = read.csv(input, nrows = 1, header = TRUE)
    header = colnames(first_row)
    
    n = 0
    con = dbConnect(SQLite(), output)
    on.exit(dbDisconnect(con))
    while(TRUE) {
        df = read.csv(input, 
                      skip = 1 + n*chunk_size, nrows = chunk_size,
                      header = FALSE, col.names = header,
                      stringsAsFactors = FALSE)
        if (nrow(df) == 0) break
        dbWriteTable(con, table, df, row.names = FALSE, append = n>0)
        n = n + 1
        cat(sprintf("%d records written\n", nrow(df)))
    }
}

编写该函数的技巧在于正确计算输入文件每个数据块的偏移量。

下面将diamonds写入csv文件再用该函数读取进行测试:

write.csv(diamonds, "../../../static/datasets/db/diamonds.csv", quote = FALSE, row.names = FALSE)
chunk_rw("../../../static/datasets/db/diamonds.csv", "../../../static/datasets/db/diamonds.sqlite", "diamonds")
#> 10000 records written
#> 10000 records written
#> 10000 records written
#> 10000 records written
#> 10000 records written
#> 3940 records written

另一种情况我们可以将所有分布的文件写入到一个数据库以便轻松地实现查询。

batch_rw = function(dir, output, table, overwrite = TRUE){
    files = list.files(dir, "\\.csv$", full.names = TRUE)
    con = dbConnect(SQLite(), output)
    
    on.exit(dbDisconnect(con))
    
    exit = dbExistsTable(con, table)
    if(exit) {
        if (overwrite) dbRemoveTable(con, table)
        else stop(sprintf("Table '%s' already exists", table))
    }
    
    exist = FALSE
    for (file in files){
        cat(file, "...")
        df = read.csv(file, header = TRUE, stringsAsFactors = FALSE)
        dbWriteTable(con, table, df, row.names = FALSE, append = exist)
        exist = TRUE
        cat("done\n")
    }
}

该函数可以读取指定目录下一组.csv文件,因为手边没有合适的数据,就不测试了。

前面介绍的SQLite数据库知识和基本用法可以推广到其他关系型数据库的使用中去。例如,通过RMySQL操作MySQL,通过RPostges操作PostreSQL,通过SQLServer操作Microsoft SQL,或者通过RODBC操作ODBC兼容数据库(Microsoft Access和Excel)。它们的操作方法类似,因此掌握一种,其他也就不成问题。

操作非关系型数据库

关系型数据库主要是以表的形式组织,即它们是相互之间具有关联的表的集合。

然而,当数据量超出服务器的承载容量时,新问题产生了,此时数据需要以分布式形式存储,同时又可以仍然保持一个逻辑数据库来进行访问。

近些年,新数据库模型的引入和其在大数据分析与实时应用中的出色表现,是的NoSQL开始流行。关系型数据库和非关系型数据库在存储模型方面的差别是显而易见的。比如我们可以将一个购物网站的商品和评论信息存储在一个具有两张表的关系型数据库中。

当一个商品具有许多相关的表和海量记录时,数据库必须分给服务器群,但这会增加数据查询的难度,因为即使运行一个简单的查询也是极度低效。如果MongoDB存储这样的数据,每个商品被存储为一个文档,该商品的所有评论会以数组的形式存储在该文档的一个字段中。如此一来,数据的查询就容易多了。

MongoDB操作

MongoDB是一种流行的非关系型数据库,它提供了一种面向文档的数据存储方式。每个商品就是集合的一份文档,商品具有一些描述字段和一个数组类型的评论字段。所有的评论都是一个子文档,因此每个逻辑项可以用子集的逻辑形式存储。

关系型数据库可能具有许多模式,每种模式(数据库)可以由多张表组成,每张表可能含有多条记录。相似地,一个MongoDB实例可以搭建多个数据库。每个数据库可以存在多个集合,每个集合内可能有多个文档。二者的主要区别在于,关系型数据库中,一张表的所有记录具有相同的结构,但MongoDB数据库集合内的文档却没有模式限制,可以灵活地实现嵌套结构。

安装MongoDB,请访问https://docs.mongodb.com/manual/installation/,它几乎支持所有的主流平台,按照说明操作即可。

用MongoDB查询数据

假设我们已经在电脑上按照好了MongoDB,可以在R中使用mongolite扩展包操作MongoDB。

install.packages("mongolite")
library(mongolite)
m = mongo("products", "test", "mongodb://localhost")

我的黑MAC上用homebrew安装不了MongoDB(访问不了谷歌的API下载程序),所以后续就不学了。

感觉现在重要的还是学习关系数据库,自己用起来,掌握SQL语法。