博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang深入理解mysql数据库驱动
阅读量:3986 次
发布时间:2019-05-24

本文共 14632 字,大约阅读时间需要 48 分钟。

概述

Golang 提供了database/sql包用于对SQL数据库的访问, 作为操作数据库的入口对象sql.DB, 主要为我们提供了两个重要的功能:

sql.DB 通过数据库驱动为我们提供管理底层数据库连接的打开和关闭操作.
sql.DB 为我们管理数据库连接池

需要注意的是,sql.DB表示操作数据库的抽象访问接口,而非一个数据库连接对象;它可以根据driver打开关闭数据库连接,管理连接池。正在使用的连接被标记为繁忙,用完后回到连接池等待下次使用。所以,如果你

没有把连接释放回连接池,会导致过多连接使系统资源耗尽。

通常来说, 不应该直接使用驱动所提供的方法, 而是应该使用 sql.DB, 因此在导入 mysql 驱动时, 这里使用了匿名导入的方式( 在包路径前添加 _ ), 当导入了一个数据库驱动后, 此驱动会自行初始化并注册自己到

Golang的database/sql的驱动map中, 因此我们就可以通过 database/sql 包提供的方法访问数据库了。

注册驱动

import _ "github.com/go-sql-driver/mysql"

driver.go 文件

找到mysql包的init方法

func init() {
sql.Register("mysql", &MySQLDriver{
})}

sql 为 database/sql 包,说明mysql包自身就依赖 database/sql 包。

sql采用map的方式存储驱动。

func Register(name string, driver driver.Driver) {
driversMu.Lock() defer driversMu.Unlock() if driver == nil {
panic("sql: Register driver is nil") } if _, dup := drivers[name]; dup {
panic("sql: Register called twice for driver " + name) } drivers[name] = driver}type Driver interface {
Open(name string) (Conn, error)}

可见驱动需要实现 Open 方法。

type MySQLDriver struct{
}func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
cfg, err := ParseDSN(dsn) if err != nil {
return nil, err } c := &connector{
cfg: cfg, } return c.Connect(context.Background())}

连接数据库

db, err := sql.Open("mysql", cfg)

sql.open 方法:

// Open opens a database specified by its database driver name and a// driver-specific data source name, usually consisting of at least a// database name and connection information.//// Most users will open a database via a driver-specific connection// helper function that returns a *DB. No database drivers are included// in the Go standard library. See https://golang.org/s/sqldrivers for// a list of third-party drivers.//// Open may just validate its arguments without creating a connection// to the database. To verify that the data source name is valid, call// Ping.//// The returned DB is safe for concurrent use by multiple goroutines// and maintains its own pool of idle connections. Thus, the Open// function should be called just once. It is rarely necessary to// close a DB.func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock() driveri, ok := drivers[driverName] driversMu.RUnlock() if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName) } if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName) if err != nil {
return nil, err } return OpenDB(connector), nil } return OpenDB(dsnConnector{
dsn: dataSourceName, driver: driveri}), nil}

得到的 *DB 是协程安全的,要确保 Open 方法只被调用一次,避免每个请求过来都去 Open ,在使用过程中更加没必要close a DB对象。

func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background()) db := &DB{
connector: c, openerCh: make(chan struct{
}, connectionRequestQueueSize), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } go db.connectionOpener(ctx) return db}

会启动一个goroutine来异步的创建连接,在后面会提到。

// Runs in a separate goroutine, opens new connections when requested.func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done(): return case <-db.openerCh: db.openNewConnection(ctx) } }}

当收到 创建连接 的消息时执行 db.openNewConnection 去调用驱动的 Connector 对象的 Connect 方法:

ci, err := db.connector.Connect(ctx)

再来看看mysql的connector

func (d MySQLDriver) OpenConnector(dsn string) (driver.Connector, error) {
cfg, err := ParseDSN(dsn) if err != nil {
return nil, err } return &connector{
cfg: cfg, }, nil}

driver.Connector 需要实现两个方法:

type Connector interface {
Connect(context.Context) (Conn, error) Driver() Driver}

Connect方法才是真正发起连接的操作。

由 db.openerCh 通道表明,sql.Open并不会立即检验参数和创建连接,而是等待通知过来才去创建。如果想立即验证连接,需要用Ping()方法。

db.Ping()

Ping完之后自动调用 dc.releaseConn 释放了连接。

sql.DB的设计就是用来作为长连接使用的。不要频繁Open, Close。比较好的做法是,为每个不同的datastore建一个DB对象,保持这些对象Open。

数据查询操作

数据库查询的一般步骤如下:

1、调用 db.Query 执行 SQL 语句, 此方法会返回一个 *Rows。

2、通过 *Rows.Next() 迭代查询数据。
3、通过 *Rows.Scan() 读取每一行的值,并填充到变量。
4、调用 *Rows.Close() 关闭资源,并释放连接。

先来看看 db.Query() 方法,最终跟踪到 quertDC() 方法,这个方法中只有出错了才会去调用 releaseConn,否则返回*Rows,并且将 releaseConn 注入到 *Rows 中;我们再来看看 *Rows.Close() 方法,它最终会去调用 releaseConn。

所以,需要手动调用 rows.Close() 方法,或者 defer rows.Close() 来将连接放回连接池。

当然,如果 *Rows 是局部变量的话,离开作用域后会被自动回收,自动调用 close() 方法来释放,但是为了安全起见,使用 defer rows.Close()来处理。

示例:

func qu() {
rows, err := Db.Query("select * from user limit 1") if err != nil {
fmt.Println(err) return } ds := Db.Stats() fmt.Println(ds.InUse) // 1 fmt.Println(ds.OpenConnections) // 1 rows.Close() ds2 := Db.Stats() fmt.Println(ds2.InUse) // 0 fmt.Println(ds2.OpenConnections) // 1}

但是值得注意的是,rows 一旦被关闭后,将无法使用 rwos.Next() 迭代数据集。

那么能不能先关闭连接我再来慢慢遍历呢,使连接得到充分利用??貌似没有提供这个功能。

关于 rows:

type Rows struct {
dc *driverConn // owned; must call releaseConn when closed to release releaseConn func(error) rowsi driver.Rows cancel func() // called when Rows is closed, may be nil. closeStmt *driverStmt // if non-nil, statement to Close on close closemu sync.RWMutex closed bool lasterr error // non-nil only if closed is true lastcols []driver.Value}

都是不可导出的。

遍历数据集

func qu() {
rows, err := Db.Query("select id from photo_user_data where id = 8 limit 1") if err != nil {
fmt.Println(err) return } defer rows.Close() for rows.Next() {
var id int err := rows.Scan(&id) if err != nil{
fmt.Println(err) break } fmt.Println(id) }}

rows.Next() 返回 bool,表示是否有数据可以迭代;同时将一条数据放到 rows.lastcols ,它是一个切片,里面元素的顺序就是你查询的 column 顺序,然后 scan 方法遍历 rows.lastcols 来填充变量。

如果一条记录都没有显然就不会进入循环体。

for i, sv := range rs.lastcols {
err := convertAssignRows(dest[i], sv, rs) if err != nil {
return fmt.Errorf(`sql: Scan error on column index %d, name %q: %v`, i, rs.rowsi.Columns()[i], err) }}

通过位置而不是名称来对应,所以 Scan() 中参数的顺序自己要控制好。

当遍历完成之后会自动调用 rows.CLose() 释放连接,因此 rows 只能被遍历一次。

如何在不遍历的情况下知道是否查询到结果???

查询单条

使用 db.QueryRow() 得到 *Row

func (db *DB) QueryRow(query string, args ...interface{
}) *Row

调用 *Row 的 Scan() 方法填充变量,如果没有查询到结果则返回 ErrNoRows 的 error。

func (r *Row) Scan(dest ...interface{
}) error
func in() {
var id int err := Db.QueryRow("select id from photo_user_data where id = 7 limit 1").Scan(&id) if err != nil {
if err == sql.ErrNoRows {
fmt.Println("err no rows") } else {
fmt.Println(err) } } fmt.Print(id)}

QueryRow 其实是对 Query 的进一步封装。

Scan() 内部调用了 r.rows.Close() 来释放连接和资源。

增,删,改操作

增,删,改 都使用 db.Exec() 方法。

对于 db.Exec() 方法则不用担心,因为它主动做了释放连接的工作。

defer func() {
release(err)}()
func (db *DB) Exec(query string, args ...interface{
}) (Result, error)

其中 Result 接口包含两个方法

type Result interface {
LastInsertId() (int64, error) RowsAffected() (int64, error)}

对于mysql驱动而言,driverResult为:

// github.com\go-sql-driver\mysql\result.gotype mysqlResult struct {
affectedRows int64 insertId int64}func (res *mysqlResult) LastInsertId() (int64, error) {
return res.insertId, nil}func (res *mysqlResult) RowsAffected() (int64, error) {
return res.affectedRows, nil}

当执行出错了,err 会有值。否则,通过 LastInsertId() 和 RowsAffected() 判断执行结果。

func ex() {
Result, err := Db.Exec("update photo_user_data set nickName=? where id = ?", "haha", 7) if err != nil {
fmt.Println(err) return } fmt.Println("update success") fmt.Println(Result.RowsAffected())}

连接池的设计

获取连接的方法

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)

db.numOpen:打开着的连接的个数,打开 numOpen++;关闭 numOpen–;要求 numOpen <= maxOpen。

空闲连接池 db.freeConn[] 切片,存储了 inUse=false 的连接,当然这里面包括了超过了 db.maxLifetime 的连接,因此过期的连接不会自动清掉,被从 db.freeConn[] 中获取之后做判断,过期则直接Close。所以很多这样的代码:

var res Resultvar err errorfor i := 0; i < maxBadConnRetries; i++ {
res, err = db.exec(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn {
break }}if err == driver.ErrBadConn {
return db.exec(ctx, query, args, alwaysNewConn)}return res, err

先尝试 maxBadConnRetries 次从 db.freeConn[] 中获取连接,不行就直接创建连接,此时也要判断如果超过了 maxOpen 的话也不能直接创建连接,而是将请求写入到 db.connRequests 的 map 中,然后在释放连接的地方来优先分配给它们,然后多余的连接才放入 freeConn[] 中去。

查看方法 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool

事务处理

开启事务

func (db *DB) Begin() (*Tx, error)func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)

*Tx 下面的方法:

func (tx *Tx) Commit() errorfunc (tx *Tx) Rollback() errorfunc (tx *Tx) Exec(query string, args ...interface{
}) (Result, error)func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{
}) (Result, error)func (tx *Tx) Prepare(query string) (*Stmt, error)func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error)func (tx *Tx) Query(query string, args ...interface{
}) (*Rows, error)func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{
}) (*Rows, error)func (tx *Tx) QueryRow(query string, args ...interface{
}) *Rowfunc (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{
}) *Rowfunc (tx *Tx) Stmt(stmt *Stmt) *Stmtfunc (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt
// The rollback will be ignored if the tx has been committed or rolled backdefer tx.Rollback()

默认会执行rollback操作,因此可不用认为指定。

可选配置

// TxOptions holds the transaction options to be used in DB.BeginTx.type TxOptions struct {
// Isolation is the transaction isolation level. // If zero, the driver or database's default level is used. Isolation IsolationLevel ReadOnly bool}

isolation 隔离级别

示例:

func tx() {
tx, err := Db.Begin() if err != nil {
fmt.Println(err) return } defer tx.Rollback() Result, err := tx.Exec("update photo_user_data set nickName=? where id = ?", "aaaaaaaa", 8) if err != nil {
fmt.Println(err) return } fmt.Println("update success") fmt.Println(Result.RowsAffected()) tx.Commit()}

事务与预编译

func txp() {
tx, err := Db.Begin() if err != nil {
fmt.Println(err) return } defer tx.Rollback() stmt, err := tx.Prepare("update photo_user_data set nickName=? where id = ?") if err != nil {
fmt.Println(err) return } Result, err := stmt.Exec("nnnnnnnnn", 8) if err != nil {
fmt.Println(err) return } fmt.Println(Result.RowsAffected()) tx.Commit()}

预编译SQL语句

SQL预编译后相当于得到一个模板,向这个模板填充参数即可被执行。

可以防止SQL注入,因为是模板填充而不是拼接SQL语句。
并且提升了SQL执行效率,因为使用编译后的语句是不用再去做SQL校验和编译工作。

所以,预编译要想达到提升执行效率的效果,前提是在执行相同的SQL语句(参数可以允许不一样)时,共享 stmt 对象,而不是每次都来 prepare。否则,预编译反而降低了性能,只是防止了SQL注入。

先使用SQL语句和占位符定义语句,在使用 Exec() 执行。

Db.Prepare(sql)编译增删改查语句得到 *stmt

func (db *DB) Prepare(query string) (*Stmt, error)

然后,查询使用 *Stmt.Query(), *Stmt.QueryRow() 方法,和上面的用法一样。

增,删,改使用 *Stmt.Exec() 方法,和上面的用法一样。

Prepare() 会自动释放连接。但是要手动释放 stmt

defer stmt.Close()

示例:

func pre() {
stmt, err := Db.Prepare("insert into photo_category(name, activity_id) value(?, ?)") if err != nil {
fmt.Println(err) return } defer stmt.Close() Result, err := stmt.Exec("haha", 1) if err != nil {
fmt.Println(err) return } fmt.Println(Result.LastInsertId()) // stmt 可重复使用,以提升执行效率 Result1, err := stmt.Exec("haha1", 11) Result2, err := stmt.Exec("haha2", 12) Result3, err := stmt.Exec("haha3", 13)}

可见,使用预编译的话,申请了两次连接。

mysql的预编译是针对连接的,也就是说,Prepare操作和Exec操作要使用同一个连接才可以,否则将没法执行,因为模板SQL和普通的SQL不一样。

那么该如何去管理这种连接呢?

在database/sql中使用了额外的结构 []connStmt{} 来存储当前 stmt 和其对应的连接,在 Prepare操作之后正常释放了连接,并记录到 []connStmt{},这样此连接依然可以被多处使用,

当执行 stmt.Exec / stmt.Query / stmt.QueryRow 操作时,获取连接的方式就不一样了,不是从连接池获取,而是从 []connStmt{} 去拿,此时如果 stmt 对应的连接正在别处被使用,

那么需要阻塞等待,如果此连接关闭了,则需要从连接池获取新的连接,然后需要额外再次编译并运行。

即便如此,在一个高并发的情况下,连接被占用的概率很大,那么预编译的方式来执行SQL,是否能提高性能还不一定,但是从防止SQL注入的角度来看,预编译还是必要的。

下面来看一下database/sql中预编译相关的代码:

// Prepare creates a prepared statement for later queries or executions.// Multiple queries or executions may be run concurrently from the// returned statement.// The caller must call the statement's Close method// when the statement is no longer needed.func (db *DB) Prepare(query string) (*Stmt, error) {
return db.PrepareContext(context.Background(), query)}func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
// TODO: check if db.driver supports an optional // driver.Preparer interface and call that instead, if so, // otherwise we make a prepared statement that's bound // to a connection, and to execute this prepared statement // we either need to use this connection (if it's free), else // get a new connection + re-prepare + execute on that one. dc, err := db.conn(ctx, strategy) if err != nil {
return nil, err } return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)}// prepareDC prepares a query on the driverConn and calls release before// returning. When cg == nil it implies that a connection pool is used, and// when cg != nil only a single driver connection is used.func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
var ds *driverStmt var err error defer func() {
release(err) }() withLock(dc, func() {
ds, err = dc.prepareLocked(ctx, cg, query) }) if err != nil {
return nil, err } stmt := &Stmt{
db: db, query: query, cg: cg, cgds: ds, } // When cg == nil this statement will need to keep track of various // connections they are prepared on and record the stmt dependency on // the DB. if cg == nil {
stmt.css = []connStmt{
{
dc, ds}} stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed) db.addDep(stmt, stmt) } return stmt, nil}// connStmt returns a free driver connection on which to execute the// statement, a function to call to release the connection, and a// statement bound to that connection.func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
if err = s.stickyErr; err != nil {
return } s.mu.Lock() if s.closed {
s.mu.Unlock() err = errors.New("sql: statement is closed") return } // In a transaction or connection, we always use the connection that the // stmt was created on. if s.cg != nil {
s.mu.Unlock() dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection. if err != nil {
return } return dc, releaseConn, s.cgds, nil } s.removeClosedStmtLocked() s.mu.Unlock() dc, err = s.db.conn(ctx, strategy) if err != nil {
return nil, nil, nil, err } s.mu.Lock() for _, v := range s.css {
if v.dc == dc {
s.mu.Unlock() return dc, dc.releaseConn, v.ds, nil } } s.mu.Unlock() // No luck; we need to prepare the statement on this connection withLock(dc, func() {
ds, err = s.prepareOnConnLocked(ctx, dc) }) if err != nil {
dc.releaseConn(err) return nil, nil, nil, err } return dc, dc.releaseConn, ds, nil}

转载地址:http://fnaui.baihongyu.com/

你可能感兴趣的文章
Visual Studio 2010:C++0x新特性
查看>>
drwtsn32.exe和adplus.vbs进行dump文件抓取
查看>>
cppcheck c++静态代码检查
查看>>
CLOSE_WAIT和TIME_WAIT
查看>>
在C++中使用Lua
查看>>
在Dll中调用自身的位图资源
查看>>
IP校验和详解
查看>>
C++中使用Mongo执行count和distinct运算
查看>>
一些socket的编程经验
查看>>
socket编程中select的使用
查看>>
C++获取文件大小常用技巧分享
查看>>
未来5年大机遇:做贩卖多巴胺的超级玩家
查看>>
30 岁之前,应当学会哪些技能?
查看>>
关于AIS编码解码的两个小问题
查看>>
GitHub 万星推荐:黑客成长技术清单
查看>>
可以在线C++编译的工具站点
查看>>
关于无人驾驶的过去、现在以及未来,看这篇文章就够了!
查看>>
所谓的进步和提升,就是完成认知升级
查看>>
Web开发之路
查看>>
昨夜今晨最大八卦终于坐实——人类首次直接探测到了引力波
查看>>