Android数据库源码分析(2)-SQLiteDatabase的实现以及多线程行为

Android数据库源码分析(2)-SQLiteDatabase的实现以及多线程行为

 

本系列主要关注安卓数据库的线程行为,分为四个部分:
(1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
(2)SQLiteDatabase的实现以及多线程行为
(3)连接缓存池SQLiteConnectionPool
(4)SQLiteDatabase多线程实践

本篇主要关注SQLiteDatabase的线程同步实现与架构实现。

1 SQLiteClosable的acquireReference与releaseReference方法

SQLiteClosableSQLiteDatabase的父类,也同时是数据库下其他几个类的父类。其中实现了引用计数逻辑来控制资源释放的时机。

private int mReferenceCount = 1;

public void acquireReference() {
    synchronized(this) {
        if (mReferenceCount <= 0) {
            throw new IllegalStateException(
                    "attempt to re-open an already-closed object: " + this);
        }
        mReferenceCount++;
    }
}

public void releaseReference() {
    boolean refCountIsZero = false;
    synchronized(this) {
        refCountIsZero = --mReferenceCount == 0;
    }
    if (refCountIsZero) {
        onAllReferencesReleased();
    }
}

可以看到这里用mReferenceCount简单地实现了一个引用计数。而引用计数的初始值是1。SQLiteDatabase会在每次操作前调用一次acquireReference,而在结束后调用一次releaseReference。为了方便,下文中把这样的被acquireReferencereleaseReference包裹的过程称为一次“操作”。
那么如果这两个方法保持成对调用的话,是不是就不可能触发onAllReferenceReleased方法?事实上,SQLiteClosable还有一个方法close调用了releaseReference。由于锁的存在,只要不在其它“操作”中调用close,调用close之后mReferenceCount的值可以断定是0。
到这里为止,感觉上是可以用一个boolean值来标记引用状态的。因为由于锁的存在,只要各个“操作”是序列进行的(没有一个“操作”调用了另一个“操作”的情况),mReferenceCount只可能是0和1。推测引用计数就是为了应付“操作”之间存在调用这种情况。这就像同一个线程里的嵌套锁需要进行计数一样。

2 SQLiteDatabase的打开与关闭

2.1 关闭

上文中提到的onAllReferenceReleased是一个抽象方法。其在SQLiteDatabase中的实现为

@Override
protected void onAllReferencesReleased() {
    dispose(false);
}

在finalize中同样调用了dispose方法

protected void finalize() throws Throwable {
    try {
        dispose(true);
    } finally {
        super.finalize();
    }
}

dispose的实现为

private void dispose(boolean finalized) {
    final SQLiteConnectionPool pool;
    synchronized (mLock) {
        if (mCloseGuardLocked != null) {
            if (finalized) {
                //CloseGuard是一个监测是否及时调用close方法的类,一般来说除了输出日志并不会做别的什么
                //这里事实上就是在finalize的时候如果没有close过,就输出一条日志
                mCloseGuardLocked.warnIfOpen();
            }
            mCloseGuardLocked.close();
        }
        pool = mConnectionPoolLocked;//这个mConnectionPool是连接池。此方法里将其置空并关闭。后文详细讨论其作用。
        mConnectionPoolLocked = null;
    }
    if (!finalized) {
        //sActiveDatabases是一个静态的WeakHashMap,用key来放置所有活动数据库,而value并没有作用。dispose的时候自然要移除this。
        //跟踪代码分析下来,用这个map只是为了bug report
        synchronized (sActiveDatabases) {
            sActiveDatabases.remove(this);
        }
        if (pool != null) {
            pool.close();
        }
    }
}

2.2 打开

在本系列第一篇中我们曾看到过,最终的打开数据库的是一个静态方法,SQLiteDatabase.openDatabase(String path, CursorFactory factory, int flags, DatabaseErrorHandler errorHandler)

public static SQLiteDatabase openDatabase(String path, CursorFactory factory, int flags,
        DatabaseErrorHandler errorHandler) {
    SQLiteDatabase db = new SQLiteDatabase(path, flags, factory, errorHandler);
    db.open();
    return db;
}

这里很简单,就是新建一个对象,然后调用open。构造器里只有一些初始化,略过。着重看open方法:

private void open() {
    try {
        try {
            openInner();//尝试一次
        } catch (SQLiteDatabaseCorruptException ex) {
            onCorruption();//失败了,再次尝试前调用另一个方法。
            openInner();
        }
    } catch (SQLiteException ex) {
        Log.e(TAG, "Failed to open database '" + getLabel() + "'.", ex);
        close();
        throw ex;
    }
}

private void openInner() {
    synchronized (mLock) {
        assert mConnectionPoolLocked == null;
        mConnectionPoolLocked = SQLiteConnectionPool.open(mConfigurationLocked);
        mCloseGuardLocked.open("close");
    }

    synchronized (sActiveDatabases) {//这是之前那个WeakHashMap
        sActiveDatabases.put(this, null);
    }
}

void onCorruption() {
    EventLog.writeEvent(EVENT_DB_CORRUPT, getLabel());
    mErrorHandler.onCorruption(this);
}

open中会尝试调用openInner。如果失败一次,则调用onCorruption,随后再尝试一次。mErrorHandler是构造器传入的,构造器参数由静态方法openDatabase传入,而这个参数又最终从SQLiteOpenHelper传入。

openInner中做的事情,从命名上看,是开启一个SQLiteConnectionPool即数据库连接池。简单地说,数据库连接池维持了对数据库的多个连接。数据库连接的类是SQLiteConnection

3 线程内单例的SQLiteSession

private final ThreadLocal<SQLiteSession> mThreadSession = new ThreadLocal<SQLiteSession>() {
    @Override
    protected SQLiteSession initialValue() {
        return createSession();
    }
};

SQLiteSession getThreadSession() {
    return mThreadSession.get(); // initialValue() throws if database closed
}

SQLiteSession createSession() {
    final SQLiteConnectionPool pool;
    synchronized (mLock) {
        throwIfNotOpenLocked();
        pool = mConnectionPoolLocked;
    }
    return new SQLiteSession(pool);
}

ThreadLocal会在每个线程内维护一个对象,而在线程结束时解除对对象的引用。initialValue方法会在线程中不存在已有对象时创建一个,不Override的话会给出一个null。除此之外也可以通过ThreadLocal.set来给本线程配置一个对象。
可以看到mThreadSession是一个ThreadLocal。调用getThreadSession会获取一个线程内单例的SQLiteSession对象。

SQLiteSession是提供数据库操作能力(增删改查以及事务)的一个单元。它会从SQLiteConnectionPool即连接池中获取连接,最终对数据库进行操作。

到这儿类已经有点多了。整理一下逻辑:
(1)SQLiteDatabase持有一个ThreadLocal,用于对每个线程生成一个SQLiteSession
(2)SQLiteSession持有SQLiteConnectionPool(虽然SQLiteDatabase也持有连接池对象,但它只用来传递给SQLiteSession),但是同一个SQLiteDatabase下的SQLiteSession是共用一个SQLiteConnectionPool的;
(3)SQLiteConnectionPool管理SQLiteConnection并适时向SQLiteSession提供之;
(4)SQLiteConnection直接对底层数据库进行操作(这个类里面才有大量的native方法)。

接下来分析一下SQLiteSession

获取与释放连接,还是一个引用计数实现:

private final SQLiteConnectionPool mConnectionPool;//构造器中初始化,值从SQLiteDatabase对象中传入
private SQLiteConnection mConnection;
private int mConnectionFlags;
private int mConnectionUseCount;//无处不在的引用计数

private void acquireConnection(String sql, int connectionFlags,
        CancellationSignal cancellationSignal) {
    if (mConnection == null) {
        assert mConnectionUseCount == 0;
        mConnection = mConnectionPool.acquireConnection(sql, connectionFlags,
                cancellationSignal); // might throw
        mConnectionFlags = connectionFlags;
    }
    mConnectionUseCount += 1;
}

private void releaseConnection() {
    assert mConnection != null;
    assert mConnectionUseCount > 0;
    if (--mConnectionUseCount == 0) {
        try {
            mConnectionPool.releaseConnection(mConnection); // might throw
        } finally {
            mConnection = null;
        }
    }
}

具体的数据库操作有很多executeXXX形式的方法,逻辑大同小异。挑一个看看:

public int executeForChangedRowCount(String sql, Object[] bindArgs, int connectionFlags,
        CancellationSignal cancellationSignal) {
    if (sql == null) {
        throw new IllegalArgumentException("sql must not be null.");
    }

    if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) {//排除特殊操作
        return 0;
    }

    //获取连接
    acquireConnection(sql, connectionFlags, cancellationSignal); // might throw
    try {
        //底层数据库操作。本文不关心。
        return mConnection.executeForChangedRowCount(sql, bindArgs,
                cancellationSignal); // might throw
    } finally {
        //释放连接
        releaseConnection(); // might throw
    }
}

//用来支持'BEGIN','COMMIT','ROLLBACK'的操作。就是与Transaction相关的操作。
private boolean executeSpecial(String sql, Object[] bindArgs, int connectionFlags,
        CancellationSignal cancellationSignal) {
    if (cancellationSignal != null) {
        cancellationSignal.throwIfCanceled();
    }

    final int type = DatabaseUtils.getSqlStatementType(sql);
    switch (type) {
        case DatabaseUtils.STATEMENT_BEGIN:
            beginTransaction(TRANSACTION_MODE_EXCLUSIVE, null, connectionFlags,
                    cancellationSignal);
            return true;

        case DatabaseUtils.STATEMENT_COMMIT:
            setTransactionSuccessful();
            endTransaction(cancellationSignal);
            return true;

        case DatabaseUtils.STATEMENT_ABORT:
            endTransaction(cancellationSignal);
            return true;
    }
    return false;
}

4 单次完整的SQLite操作

4.1 SQLiteStatement

以最简单的delete方法为例。其它方法的流程均大同小异。

public int delete(String table, String whereClause, String[] whereArgs) {
    acquireReference();
    try {
        SQLiteStatement statement =  new SQLiteStatement(this, "DELETE FROM " + table +
                (!TextUtils.isEmpty(whereClause) ? " WHERE " + whereClause : ""), whereArgs);
        try {
            return statement.executeUpdateDelete();
        } finally {
            statement.close();
        }
    } finally {
        releaseReference();
    }
}

先用SQLiteStatement做一些sql转义和拼接,然后调用statement.executeUpdateDelete()

具体看一下executeUpdateDelete

//以下来自SQLiteStatement
public int executeUpdateDelete() {
    acquireReference();//注意这里是SQLiteStatement内的引用计数,不是SQLiteDatabase了。
    try {
        return getSession().executeForChangedRowCount(
                getSql(), getBindArgs(), getConnectionFlags(), null);//上一节分析过了,执行SQL。
    } catch (SQLiteDatabaseCorruptException ex) {
        onCorruption();
        throw ex;
    } finally {
        releaseReference();
    }
}

//这个方法在父类SQLiteProgram中。又回到了上一小节的getThreadSession。获取线程内的单例。
protected final SQLiteSession getSession() {
    return mDatabase.getThreadSession();
}

4.2 SQLiteDirectCursorDriver与SQLiteQuery

与4.1不同的是,在进行query操作时,最终没有使用SQLiteStatement类,而是通过SQLiteDirectCursorDriver间接使用了SQLiteQuery。而SQLiteQuerySQLiteStatement同为SQLiteProgram的子类,完成类似的功能。

所有的query操作最终均调用这样一个方法:

public Cursor rawQueryWithFactory(
        CursorFactory cursorFactory, String sql, String[] selectionArgs,
        String editTable, CancellationSignal cancellationSignal) {
    acquireReference();
    try {
        SQLiteCursorDriver driver = new SQLiteDirectCursorDriver(this, sql, editTable,
                cancellationSignal);
        return driver.query(cursorFactory != null ? cursorFactory : mCursorFactory,
                selectionArgs);
    } finally {
        releaseReference();
    }
}

SQLiteDirectCursorDriver的query方法如下:

public Cursor query(CursorFactory factory, String[] selectionArgs) {
    final SQLiteQuery query = new SQLiteQuery(mDatabase, mSql, mCancellationSignal);
    final Cursor cursor;
    try {
        query.bindAllArgsAsStrings(selectionArgs);

        if (factory == null) {
            cursor = new SQLiteCursor(this, mEditTable, query);
        } else {
            cursor = factory.newCursor(mDatabase, this, mEditTable, query);
        }
    } catch (RuntimeException ex) {
        query.close();
        throw ex;
    }

    mQuery = query;
    return cursor;
}

其中新建了一个SQLiteQuery,并绑定参数。随后新建一个Cursor,这就是最终返回的Cursor对象。接下来考察无CursorFactory情况下默认返回的SQLiteCursor

AbstractCursor中各种move方法均会调用moveToPosition,而moveToPosition会调用onMoveSQliteCursoronMove的实现为:

@Override
public boolean onMove(int oldPosition, int newPosition) {
    // Make sure the row at newPosition is present in the window
    if (mWindow == null || newPosition < mWindow.getStartPosition() ||
            newPosition >= (mWindow.getStartPosition() + mWindow.getNumRows())) {
        fillWindow(newPosition);
    }

    return true;
}

private void fillWindow(int requiredPos) {
    clearOrCreateWindow(getDatabase().getPath());

    try {
        if (mCount == NO_COUNT) {
            int startPos = DatabaseUtils.cursorPickFillWindowStartPosition(requiredPos, 0);
            mCount = mQuery.fillWindow(mWindow, startPos, requiredPos, true);
            mCursorWindowCapacity = mWindow.getNumRows();
            if (Log.isLoggable(TAG, Log.DEBUG)) {
                Log.d(TAG, "received count(*) from native_fill_window: " + mCount);
            }
        } else {
            int startPos = DatabaseUtils.cursorPickFillWindowStartPosition(requiredPos,
                    mCursorWindowCapacity);
            mQuery.fillWindow(mWindow, startPos, requiredPos, false);
        }
    } catch (RuntimeException ex) {
        // Close the cursor window if the query failed and therefore will
        // not produce any results.  This helps to avoid accidentally leaking
        // the cursor window if the client does not correctly handle exceptions
        // and fails to close the cursor.
        closeWindow();
        throw ex;
    }
}

核心逻辑在mQuery.fillWindow(mWindow, startPos, requiredPos, false);这里。mQuery就是之前传入的SQLiteQuery对象。查看其fillWindow方法:

int fillWindow(CursorWindow window, int startPos, int requiredPos, boolean countAllRows) {
    acquireReference();
    try {
        window.acquireReference();
        try {
            int numRows = getSession().executeForCursorWindow(getSql(), getBindArgs(),
                    window, startPos, requiredPos, countAllRows, getConnectionFlags(),
                    mCancellationSignal);
            return numRows;
        } catch (SQLiteDatabaseCorruptException ex) {
            onCorruption();
            throw ex;
        } catch (SQLiteException ex) {
            Log.e(TAG, "exception: " + ex.getMessage() + "; query: " + getSql());
            throw ex;
        } finally {
            window.releaseReference();
        }
    } finally {
        releaseReference();
    }
}

可以看到,最终回到了SQLiteSession.executeXXX方法逻辑之下。其余即与上一节类似。

而从Cursor中取出数据的过程,则最终是由CursorWindow下的一系列native方法来完成,我认为属于Cursor的代码体系了,这里不重点展开。

5 Transaction

5.1 beginTransaction

//一群差不多的beginTransaction方法最终调用到了这里
private void beginTransaction(SQLiteTransactionListener transactionListener,
        boolean exclusive) {
    acquireReference();//怎么老是你
    try {
        getThreadSession().beginTransaction(
                exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
                        SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
                transactionListener,
                getThreadDefaultConnectionFlags(false /*readOnly*/), null);
    } finally {
        releaseReference();
    }
}

//上面的方法调用了这个方法。这套flags做了两件小事:1.确定只读还是可写  2.如果是主线程,就要提高连接的优先级
int getThreadDefaultConnectionFlags(boolean readOnly) {
    int flags = readOnly ? SQLiteConnectionPool.CONNECTION_FLAG_READ_ONLY :
            SQLiteConnectionPool.CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY;
    if (isMainThread()) {
        flags |= SQLiteConnectionPool.CONNECTION_FLAG_INTERACTIVE;
    }
    return flags;
}

还是要看SQLiteSession内部:

public void beginTransaction(int transactionMode,
        SQLiteTransactionListener transactionListener, int connectionFlags,
        CancellationSignal cancellationSignal) {
    throwIfTransactionMarkedSuccessful();//一点合法性检查,不贴了
    beginTransactionUnchecked(transactionMode, transactionListener, connectionFlags,
            cancellationSignal);
}

private void beginTransactionUnchecked(int transactionMode,
        SQLiteTransactionListener transactionListener, int connectionFlags,
        CancellationSignal cancellationSignal) {
    if (cancellationSignal != null) {
    //cancellationSignal从beginTransaction以及SQLiteStatement诸方法传入的均为null,调查发现仅query时可以传入此参数。
        cancellationSignal.throwIfCanceled();
    }

    if (mTransactionStack == null) {//Transaction栈为空时才获取连接。
        acquireConnection(null, connectionFlags, cancellationSignal); // might throw
    }
    try {
        // Set up the transaction such that we can back out safely
        // in case we fail part way.
        if (mTransactionStack == null) {//如果没有进行中的Transaction,创建一个并BEGIN
            // Execute SQL might throw a runtime exception.
            switch (transactionMode) {
                case TRANSACTION_MODE_IMMEDIATE:
                    mConnection.execute("BEGIN IMMEDIATE;", null,
                            cancellationSignal); // might throw
                    break;
                case TRANSACTION_MODE_EXCLUSIVE:
                    mConnection.execute("BEGIN EXCLUSIVE;", null,
                            cancellationSignal); // might throw
                    break;
                default:
                    mConnection.execute("BEGIN;", null, cancellationSignal); // might throw
                    break;
            }
        }

        // Listener might throw a runtime exception.
        if (transactionListener != null) {
            try {
                transactionListener.onBegin(); // might throw
            } catch (RuntimeException ex) {
                if (mTransactionStack == null) {
                    mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
                }
                throw ex;
            }
        }

        // Bookkeeping can't throw, except an OOM, which is just too bad...
        Transaction transaction = obtainTransaction(transactionMode, transactionListener);//创建事务
        transaction.mParent = mTransactionStack;
        mTransactionStack = transaction;//入栈
    } finally {
        if (mTransactionStack == null) {//这里要栈为空时才释放连接。不为空时永远持有一个连接。
            releaseConnection(); // might throw
        }
    }
}

private static final class Transaction {
    public Transaction mParent;//这个是个链表,或者说在这里充当了一个栈
    public int mMode;
    public SQLiteTransactionListener mListener;
    public boolean mMarkedSuccessful;
    public boolean mChildFailed;
}

5.2 setTransactionSuccessful与endTransaction

直接看SQLiteSession吧:

public void setTransactionSuccessful() {
    throwIfNoTransaction();
    throwIfTransactionMarkedSuccessful();

    mTransactionStack.mMarkedSuccessful = true;//仅仅是个标记
}

public void endTransaction(CancellationSignal cancellationSignal) {
    throwIfNoTransaction();
    assert mConnection != null;

    endTransactionUnchecked(cancellationSignal, false);
}

private void endTransactionUnchecked(CancellationSignal cancellationSignal, boolean yielding) {
    if (cancellationSignal != null) {
        cancellationSignal.throwIfCanceled();
    }

    final Transaction top = mTransactionStack;
    boolean successful = (top.mMarkedSuccessful || yielding) && !top.mChildFailed;//如果有子Transaction失败,也是失败的

    RuntimeException listenerException = null;
    final SQLiteTransactionListener listener = top.mListener;
    if (listener != null) {
        try {
            if (successful) {
                listener.onCommit(); // might throw
            } else {
                listener.onRollback(); // might throw
            }
        } catch (RuntimeException ex) {
            listenerException = ex;
            successful = false;
        }
    }

    mTransactionStack = top.mParent;//退栈
    recycleTransaction(top);//回收

    if (mTransactionStack != null) {//还没到最外层事务,只做个标记
        if (!successful) {
            mTransactionStack.mChildFailed = true;
        }
    } else {//到了最外层事务了,提交或回滚
        try {
            if (successful) {
                mConnection.execute("COMMIT;", null, cancellationSignal); // might throw
            } else {
                mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
            }
        } finally {
            releaseConnection(); // might throw
        }
    }

    if (listenerException != null) {
        throw listenerException;
    }
}

6 总结

(1)总的来说,SQLiteDatabase是线程安全且高效的。它并没有简单地对每次操作加锁,而是使用引用计数和ThreadLocal来保证连接复用的线程安全性,数据一致性则交由SQLite自身去保证,以达到最优性能。
而很多时候我们在业务层封装时反而处处加锁,其实是没有必要的。
(2)SQLiteDatabase的内部实现会让每个线程单独持有一个数据库连接(不一定是创建,因为有连接池优化),而不是每个SQLiteDatabase对象对应一个连接。
(3)数据库会给主线程持有的连接提高优先级。如果执行的是读操作或者小量数据的写入操作的话,可能可以满足主线程低延迟的需要。但是还没有具体的数据来支撑这一结论,希望有大牛补充。
(4)多线程下的事务行为本文中未作分析,下一篇会就此问题单独进行讨论。

 
 
原文地址:https://www.cnblogs.com/endv/p/12227745.html