本文共 14632 字,大约阅读时间需要 48 分钟。
Golang 提供了database/sql包用于对SQL数据库的访问, 作为操作数据库的入口对象sql.DB, 主要为我们提供了两个重要的功能:
sql.DB 通过数据库驱动为我们提供管理底层数据库连接的打开和关闭操作. sql.DB 为我们管理数据库连接池需要注意的是,sql.DB表示操作数据库的抽象访问接口,而非一个数据库连接对象;它可以根据driver打开关闭数据库连接,管理连接池。正在使用的连接被标记为繁忙,用完后回到连接池等待下次使用。所以,如果你
没有把连接释放回连接池,会导致过多连接使系统资源耗尽。通常来说, 不应该直接使用驱动所提供的方法, 而是应该使用 sql.DB, 因此在导入 mysql 驱动时, 这里使用了匿名导入的方式( 在包路径前添加 _ ), 当导入了一个数据库驱动后, 此驱动会自行初始化并注册自己到
Golang的database/sql的驱动map中, 因此我们就可以通过 database/sql 包提供的方法访问数据库了。import _ "github.com/go-sql-driver/mysql"
driver.go 文件
找到mysql包的init方法
func init() { sql.Register("mysql", &MySQLDriver{ })}
sql 为 database/sql 包,说明mysql包自身就依赖 database/sql 包。
sql采用map的方式存储驱动。
func Register(name string, driver driver.Driver) { driversMu.Lock() defer driversMu.Unlock() if driver == nil { panic("sql: Register driver is nil") } if _, dup := drivers[name]; dup { panic("sql: Register called twice for driver " + name) } drivers[name] = driver}type Driver interface { Open(name string) (Conn, error)}
可见驱动需要实现 Open 方法。
type MySQLDriver struct{ }func (d MySQLDriver) Open(dsn string) (driver.Conn, error) { cfg, err := ParseDSN(dsn) if err != nil { return nil, err } c := &connector{ cfg: cfg, } return c.Connect(context.Background())}
db, err := sql.Open("mysql", cfg)
sql.open 方法:
// Open opens a database specified by its database driver name and a// driver-specific data source name, usually consisting of at least a// database name and connection information.//// Most users will open a database via a driver-specific connection// helper function that returns a *DB. No database drivers are included// in the Go standard library. See https://golang.org/s/sqldrivers for// a list of third-party drivers.//// Open may just validate its arguments without creating a connection// to the database. To verify that the data source name is valid, call// Ping.//// The returned DB is safe for concurrent use by multiple goroutines// and maintains its own pool of idle connections. Thus, the Open// function should be called just once. It is rarely necessary to// close a DB.func Open(driverName, dataSourceName string) (*DB, error) { driversMu.RLock() driveri, ok := drivers[driverName] driversMu.RUnlock() if !ok { return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName) } if driverCtx, ok := driveri.(driver.DriverContext); ok { connector, err := driverCtx.OpenConnector(dataSourceName) if err != nil { return nil, err } return OpenDB(connector), nil } return OpenDB(dsnConnector{ dsn: dataSourceName, driver: driveri}), nil}
得到的 *DB 是协程安全的,要确保 Open 方法只被调用一次,避免每个请求过来都去 Open ,在使用过程中更加没必要close a DB对象。
func OpenDB(c driver.Connector) *DB { ctx, cancel := context.WithCancel(context.Background()) db := &DB{ connector: c, openerCh: make(chan struct{ }, connectionRequestQueueSize), lastPut: make(map[*driverConn]string), connRequests: make(map[uint64]chan connRequest), stop: cancel, } go db.connectionOpener(ctx) return db}
会启动一个goroutine来异步的创建连接,在后面会提到。
// Runs in a separate goroutine, opens new connections when requested.func (db *DB) connectionOpener(ctx context.Context) { for { select { case <-ctx.Done(): return case <-db.openerCh: db.openNewConnection(ctx) } }}
当收到 创建连接 的消息时执行 db.openNewConnection 去调用驱动的 Connector 对象的 Connect 方法:
ci, err := db.connector.Connect(ctx)
再来看看mysql的connector
func (d MySQLDriver) OpenConnector(dsn string) (driver.Connector, error) { cfg, err := ParseDSN(dsn) if err != nil { return nil, err } return &connector{ cfg: cfg, }, nil}
driver.Connector 需要实现两个方法:
type Connector interface { Connect(context.Context) (Conn, error) Driver() Driver}
Connect方法才是真正发起连接的操作。
由 db.openerCh 通道表明,sql.Open并不会立即检验参数和创建连接,而是等待通知过来才去创建。如果想立即验证连接,需要用Ping()方法。
db.Ping()
Ping完之后自动调用 dc.releaseConn 释放了连接。
sql.DB的设计就是用来作为长连接使用的。不要频繁Open, Close。比较好的做法是,为每个不同的datastore建一个DB对象,保持这些对象Open。
数据库查询的一般步骤如下:
1、调用 db.Query 执行 SQL 语句, 此方法会返回一个 *Rows。
2、通过 *Rows.Next() 迭代查询数据。 3、通过 *Rows.Scan() 读取每一行的值,并填充到变量。 4、调用 *Rows.Close() 关闭资源,并释放连接。先来看看 db.Query() 方法,最终跟踪到 quertDC() 方法,这个方法中只有出错了才会去调用 releaseConn,否则返回*Rows,并且将 releaseConn 注入到 *Rows 中;我们再来看看 *Rows.Close() 方法,它最终会去调用 releaseConn。
所以,需要手动调用 rows.Close() 方法,或者 defer rows.Close() 来将连接放回连接池。
当然,如果 *Rows 是局部变量的话,离开作用域后会被自动回收,自动调用 close() 方法来释放,但是为了安全起见,使用 defer rows.Close()来处理。
示例:
func qu() { rows, err := Db.Query("select * from user limit 1") if err != nil { fmt.Println(err) return } ds := Db.Stats() fmt.Println(ds.InUse) // 1 fmt.Println(ds.OpenConnections) // 1 rows.Close() ds2 := Db.Stats() fmt.Println(ds2.InUse) // 0 fmt.Println(ds2.OpenConnections) // 1}
但是值得注意的是,rows 一旦被关闭后,将无法使用 rwos.Next() 迭代数据集。
那么能不能先关闭连接我再来慢慢遍历呢,使连接得到充分利用??貌似没有提供这个功能。
关于 rows:
type Rows struct { dc *driverConn // owned; must call releaseConn when closed to release releaseConn func(error) rowsi driver.Rows cancel func() // called when Rows is closed, may be nil. closeStmt *driverStmt // if non-nil, statement to Close on close closemu sync.RWMutex closed bool lasterr error // non-nil only if closed is true lastcols []driver.Value}
都是不可导出的。
遍历数据集
func qu() { rows, err := Db.Query("select id from photo_user_data where id = 8 limit 1") if err != nil { fmt.Println(err) return } defer rows.Close() for rows.Next() { var id int err := rows.Scan(&id) if err != nil{ fmt.Println(err) break } fmt.Println(id) }}
rows.Next() 返回 bool,表示是否有数据可以迭代;同时将一条数据放到 rows.lastcols ,它是一个切片,里面元素的顺序就是你查询的 column 顺序,然后 scan 方法遍历 rows.lastcols 来填充变量。
如果一条记录都没有显然就不会进入循环体。
for i, sv := range rs.lastcols { err := convertAssignRows(dest[i], sv, rs) if err != nil { return fmt.Errorf(`sql: Scan error on column index %d, name %q: %v`, i, rs.rowsi.Columns()[i], err) }}
通过位置而不是名称来对应,所以 Scan() 中参数的顺序自己要控制好。
当遍历完成之后会自动调用 rows.CLose() 释放连接,因此 rows 只能被遍历一次。
如何在不遍历的情况下知道是否查询到结果???
使用 db.QueryRow() 得到 *Row
func (db *DB) QueryRow(query string, args ...interface{ }) *Row
调用 *Row 的 Scan() 方法填充变量,如果没有查询到结果则返回 ErrNoRows 的 error。
func (r *Row) Scan(dest ...interface{ }) error
func in() { var id int err := Db.QueryRow("select id from photo_user_data where id = 7 limit 1").Scan(&id) if err != nil { if err == sql.ErrNoRows { fmt.Println("err no rows") } else { fmt.Println(err) } } fmt.Print(id)}
QueryRow 其实是对 Query 的进一步封装。
Scan() 内部调用了 r.rows.Close() 来释放连接和资源。增,删,改 都使用 db.Exec() 方法。
对于 db.Exec() 方法则不用担心,因为它主动做了释放连接的工作。
defer func() { release(err)}()
func (db *DB) Exec(query string, args ...interface{ }) (Result, error)
其中 Result 接口包含两个方法
type Result interface { LastInsertId() (int64, error) RowsAffected() (int64, error)}
对于mysql驱动而言,driverResult为:
// github.com\go-sql-driver\mysql\result.gotype mysqlResult struct { affectedRows int64 insertId int64}func (res *mysqlResult) LastInsertId() (int64, error) { return res.insertId, nil}func (res *mysqlResult) RowsAffected() (int64, error) { return res.affectedRows, nil}
当执行出错了,err 会有值。否则,通过 LastInsertId() 和 RowsAffected() 判断执行结果。
func ex() { Result, err := Db.Exec("update photo_user_data set nickName=? where id = ?", "haha", 7) if err != nil { fmt.Println(err) return } fmt.Println("update success") fmt.Println(Result.RowsAffected())}
获取连接的方法
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error)
db.numOpen:打开着的连接的个数,打开 numOpen++;关闭 numOpen–;要求 numOpen <= maxOpen。
空闲连接池 db.freeConn[] 切片,存储了 inUse=false 的连接,当然这里面包括了超过了 db.maxLifetime 的连接,因此过期的连接不会自动清掉,被从 db.freeConn[] 中获取之后做判断,过期则直接Close。所以很多这样的代码:var res Resultvar err errorfor i := 0; i < maxBadConnRetries; i++ { res, err = db.exec(ctx, query, args, cachedOrNewConn) if err != driver.ErrBadConn { break }}if err == driver.ErrBadConn { return db.exec(ctx, query, args, alwaysNewConn)}return res, err
先尝试 maxBadConnRetries 次从 db.freeConn[] 中获取连接,不行就直接创建连接,此时也要判断如果超过了 maxOpen 的话也不能直接创建连接,而是将请求写入到 db.connRequests 的 map 中,然后在释放连接的地方来优先分配给它们,然后多余的连接才放入 freeConn[] 中去。
查看方法func (db *DB) putConnDBLocked(dc *driverConn, err error) bool
开启事务
func (db *DB) Begin() (*Tx, error)func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error)
*Tx 下面的方法:
func (tx *Tx) Commit() errorfunc (tx *Tx) Rollback() errorfunc (tx *Tx) Exec(query string, args ...interface{ }) (Result, error)func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{ }) (Result, error)func (tx *Tx) Prepare(query string) (*Stmt, error)func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error)func (tx *Tx) Query(query string, args ...interface{ }) (*Rows, error)func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{ }) (*Rows, error)func (tx *Tx) QueryRow(query string, args ...interface{ }) *Rowfunc (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{ }) *Rowfunc (tx *Tx) Stmt(stmt *Stmt) *Stmtfunc (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt
// The rollback will be ignored if the tx has been committed or rolled backdefer tx.Rollback()
默认会执行rollback操作,因此可不用认为指定。
可选配置
// TxOptions holds the transaction options to be used in DB.BeginTx.type TxOptions struct { // Isolation is the transaction isolation level. // If zero, the driver or database's default level is used. Isolation IsolationLevel ReadOnly bool}
isolation 隔离级别
示例:
func tx() { tx, err := Db.Begin() if err != nil { fmt.Println(err) return } defer tx.Rollback() Result, err := tx.Exec("update photo_user_data set nickName=? where id = ?", "aaaaaaaa", 8) if err != nil { fmt.Println(err) return } fmt.Println("update success") fmt.Println(Result.RowsAffected()) tx.Commit()}
事务与预编译
func txp() { tx, err := Db.Begin() if err != nil { fmt.Println(err) return } defer tx.Rollback() stmt, err := tx.Prepare("update photo_user_data set nickName=? where id = ?") if err != nil { fmt.Println(err) return } Result, err := stmt.Exec("nnnnnnnnn", 8) if err != nil { fmt.Println(err) return } fmt.Println(Result.RowsAffected()) tx.Commit()}
SQL预编译后相当于得到一个模板,向这个模板填充参数即可被执行。
可以防止SQL注入,因为是模板填充而不是拼接SQL语句。 并且提升了SQL执行效率,因为使用编译后的语句是不用再去做SQL校验和编译工作。所以,预编译要想达到提升执行效率的效果,前提是在执行相同的SQL语句(参数可以允许不一样)时,共享 stmt 对象,而不是每次都来 prepare。否则,预编译反而降低了性能,只是防止了SQL注入。
先使用SQL语句和占位符定义语句,在使用 Exec() 执行。
Db.Prepare(sql)编译增删改查语句得到 *stmtfunc (db *DB) Prepare(query string) (*Stmt, error)
然后,查询使用 *Stmt.Query(), *Stmt.QueryRow() 方法,和上面的用法一样。
增,删,改使用 *Stmt.Exec() 方法,和上面的用法一样。Prepare() 会自动释放连接。但是要手动释放 stmt
defer stmt.Close()
示例:
func pre() { stmt, err := Db.Prepare("insert into photo_category(name, activity_id) value(?, ?)") if err != nil { fmt.Println(err) return } defer stmt.Close() Result, err := stmt.Exec("haha", 1) if err != nil { fmt.Println(err) return } fmt.Println(Result.LastInsertId()) // stmt 可重复使用,以提升执行效率 Result1, err := stmt.Exec("haha1", 11) Result2, err := stmt.Exec("haha2", 12) Result3, err := stmt.Exec("haha3", 13)}
可见,使用预编译的话,申请了两次连接。
mysql的预编译是针对连接的,也就是说,Prepare操作和Exec操作要使用同一个连接才可以,否则将没法执行,因为模板SQL和普通的SQL不一样。
那么该如何去管理这种连接呢?
在database/sql中使用了额外的结构 []connStmt{} 来存储当前 stmt 和其对应的连接,在 Prepare操作之后正常释放了连接,并记录到 []connStmt{},这样此连接依然可以被多处使用,
当执行 stmt.Exec / stmt.Query / stmt.QueryRow 操作时,获取连接的方式就不一样了,不是从连接池获取,而是从 []connStmt{} 去拿,此时如果 stmt 对应的连接正在别处被使用,
那么需要阻塞等待,如果此连接关闭了,则需要从连接池获取新的连接,然后需要额外再次编译并运行。
即便如此,在一个高并发的情况下,连接被占用的概率很大,那么预编译的方式来执行SQL,是否能提高性能还不一定,但是从防止SQL注入的角度来看,预编译还是必要的。
下面来看一下database/sql中预编译相关的代码:
// Prepare creates a prepared statement for later queries or executions.// Multiple queries or executions may be run concurrently from the// returned statement.// The caller must call the statement's Close method// when the statement is no longer needed.func (db *DB) Prepare(query string) (*Stmt, error) { return db.PrepareContext(context.Background(), query)}func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) { // TODO: check if db.driver supports an optional // driver.Preparer interface and call that instead, if so, // otherwise we make a prepared statement that's bound // to a connection, and to execute this prepared statement // we either need to use this connection (if it's free), else // get a new connection + re-prepare + execute on that one. dc, err := db.conn(ctx, strategy) if err != nil { return nil, err } return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)}// prepareDC prepares a query on the driverConn and calls release before// returning. When cg == nil it implies that a connection pool is used, and// when cg != nil only a single driver connection is used.func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) { var ds *driverStmt var err error defer func() { release(err) }() withLock(dc, func() { ds, err = dc.prepareLocked(ctx, cg, query) }) if err != nil { return nil, err } stmt := &Stmt{ db: db, query: query, cg: cg, cgds: ds, } // When cg == nil this statement will need to keep track of various // connections they are prepared on and record the stmt dependency on // the DB. if cg == nil { stmt.css = []connStmt{ { dc, ds}} stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed) db.addDep(stmt, stmt) } return stmt, nil}// connStmt returns a free driver connection on which to execute the// statement, a function to call to release the connection, and a// statement bound to that connection.func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) { if err = s.stickyErr; err != nil { return } s.mu.Lock() if s.closed { s.mu.Unlock() err = errors.New("sql: statement is closed") return } // In a transaction or connection, we always use the connection that the // stmt was created on. if s.cg != nil { s.mu.Unlock() dc, releaseConn, err = s.cg.grabConn(ctx) // blocks, waiting for the connection. if err != nil { return } return dc, releaseConn, s.cgds, nil } s.removeClosedStmtLocked() s.mu.Unlock() dc, err = s.db.conn(ctx, strategy) if err != nil { return nil, nil, nil, err } s.mu.Lock() for _, v := range s.css { if v.dc == dc { s.mu.Unlock() return dc, dc.releaseConn, v.ds, nil } } s.mu.Unlock() // No luck; we need to prepare the statement on this connection withLock(dc, func() { ds, err = s.prepareOnConnLocked(ctx, dc) }) if err != nil { dc.releaseConn(err) return nil, nil, nil, err } return dc, dc.releaseConn, ds, nil}
转载地址:http://fnaui.baihongyu.com/