目录
前言
将 LiveData 与 Room 一起使用
观察数据表变化的本质是触发器
触发器
DAO实现类源码分析
InvalidationTracker
LiveData的使用
RoomTrackingLiveData对象
RoomTrackingLiveData.mRefreshRunnable
InvalidationTracker.Observer
InvalidationTracker.addObserver
InvalidationTracker.mObserverMap
InvalidationTracker.mRefreshRunnable
refreshVersionsAsync
refreshVersionsSync
distinctUntilChanged
小结
前言
众所周知,为防止查询阻止界面,Room 不允许在主线程上访问数据库。此限制意味着必须将 DAO 查询设为异步,当然如果你一定要在主线程中查数据库,可以配置DatabaseConfiguration的allowMainThreadQueries为true来规避。
Room 库包含与多个不同框架的集成,以提供异步查询执行功能。
DAO 查询分为三类:
虽然官方已经推荐从 LiveData 迁移到 Kotlin 数据流,但是某些开源的项目还依旧使用LiveData来观察数据表的变化,为了避免知其然不知其所以然带来的其它问题,本文从源码的角度分析,LiveData是如何观察Room数据表的。
将 LiveData 与 Room 一起使用
先回顾LiveData和Room如何一起使用。
Room 持久性库支持返回 LiveData 对象的可观察查询。可观察查询属于数据库访问对象 (DAO) 的一部分。
当数据库更新时,Room 会生成更新 LiveData 对象所需的所有代码。在需要时,生成的代码会在后台线程上异步运行查询。此模式有助于使界面中显示的数据与存储在数据库中的数据保持同步。您可以在 Room 持久性库指南中详细了解 Room 和 DAO。
假设有一个AudioRecordFile表,界面需要显示表里的所有数据,并且在数据表发生新增、删除,修改时自动同步到界面,相关代码如下:
@Entity(tableName = "AudioRecordFile")
data class AudioRecordFile constructor(
@PrimaryKey(autoGenerate = true)
@ColumnInfo(name = "id") var id: Long,
@NonNull @ColumnInfo(name = "createTime") var createTime: Long,
@NonNull @ColumnInfo(name = "duration") var duration: String,
@NonNull @ColumnInfo(name = "fileName") var fileName: String,
@NonNull @ColumnInfo(name = "filePath") var filePath: String
)
@Dao
interface AudioRecordFileDao {
@Query("SELECt * FROM AudioRecordFile order by createTime DESC ")
fun observeAudioRecordFileList(): LiveData
}
可以看到,在DAO类中,定义了一个observeAudioRecordFileList方法,返回了一个LiveData对象,对应的Value是List。
然后在ViewModel中得到这个LiveData对象:
class AudioRecordFileLocalDataSource @Inject constructor(
private val audioRecordFileDao: AudioRecordFileDao,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) :
IAudioRecordFileDataSource {
......
override fun observeAudioRecordFileList(): LiveData> {
return audioRecordFileDao.observeAudioRecordFileList().map {
Result.success(it)
}
}
......
}
class AudioRecordFileRepository constructor(
private val audioRecordLocalDataSource: IAudioRecordFileDataSource,
private val audioRecordRemoteDataSource: IAudioRecordFileDataSource,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) : IAudioRecordFileRepository {
......
override fun observeAudioRecordFileList(): LiveData> {
return audioRecordLocalDataSource.observeAudioRecordFileList()
}
......
}
@HiltViewModel
class AudioRecordListViewModel @Inject constructor(
private val audioRecordFileRepository: IAudioRecordFileRepository,
private val audioTrackHandler: AudioTrackHandler
) :
ViewModel() {
private val _forceUpdate = MutableLiveData(false)
private val _items: LiveData = _forceUpdate.switchMap { forceUpdate ->
if (forceUpdate) {
_dataLoading.value = true
viewModelScope.launch {
audioRecordFileRepository.refreshAudioRecordFileList()
_dataLoading.value = false
}
}
LogUtil.i(TAG, "observeAudioRecordFileList ")
audioRecordFileRepository.observeAudioRecordFileList().distinctUntilChanged()
.switchMap { handleAudioRecordFileList(it) }
}
val items: LiveData = _items
private fun loadTasks(forceUpdate: Boolean) {
_forceUpdate.value = forceUpdate
}
fun refresh() {
loadTasks(true)
}
}
可以看到,ViewModel中是直接调用AudioRecordFileDao的observeAudioRecordFileList方法的,并且通过LiveData的扩展方法distinctUntilChange创建一个新的 LiveData 对象提供给外部,配合DataBinding或者直接Observe使用,当AudioRecordFile表有变动时,驱动界面更新。
观察数据表变化的本质是触发器
我们先思考一个问题,为什么DAO类的方法返回LiveData对象就能够实现数据表发生变化时通知界面更新?
实现这个功能,我们需要知道数据表何时发生变化,这样我们才能在观察变化时调用LiveData的setValue或postValue去更新数据。那关键就在于怎么判断数据表是否发生变化?
触发器
答案就是触发器(TRIGGER)。触发器是在指定的数据库事件发生时自动执行的数据库操作。可以指定一个触发器,只要发生特定数据库表的DELETE,INSERT或UPDATE,或者在表的一个或多个指定列上发生UPDATE时触发。
举个例子
假设客户记录存储在“customers”表中,并且该订单记录存储在“orders”表中,则以下UPDATE触发器可确保在客户更改其地址时重定向所有相关订单:
CREATE TRIGGER update_customer_address UPDATE OF address ON customers
BEGIN
UPDATE orders SET address = new.address WHERe customer_name = old.name;
END;
安装此触发器后,执行语句:
UPDATE customers SET address = '1 Main St.' WHERe name = 'Jack Jones';
导致以下内容被自动执行:
UPDATE orders SET address = '1 Main St.' WHERe customer_name = 'Jack Jones';
是不是很简单,也就是说,如果我们针对指定表的的DELETE,INSERT或UPDATE分别创建相应的触发器,触发后直接执行查询语句,将新的数据返回,这样我们就能够实现观察数据表发生变化并返回最新的数据了。
CREATE TRIGGER的扩展于阅读:
CREATE TRIGGER (Language) - Sqlite 中文开发手册 - 开发者手册 - 云+社区 - 腾讯云
还有其它思路吗?当然有,比如我们通过AOP监控AudioRecordFile表的每一次SQL语句执行,如果涉及到DELETE,INSERT或UPDATE等操作时,我们就在操作结束后查询最新数据并返回。但是这样很麻烦,一点也不优雅,是吧。
DAO实现类源码分析
那Room是使用触发器来实现观察数据表的吗?带着疑问,我们去看看DAO实现类编译后的源码。
observeAudioRecordFileList方法的源码实现如下:
@Override public LiveDataobserveAudioRecordFileList() { final String _sql = "SELECT * FROM AudioRecordFile order by createTime DESC "; final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0); return __db.getInvalidationTracker().createLiveData(new String[]{"AudioRecordFile"}, false, new Callable
() { @Override public List call() throws Exception { final Cursor _cursor = DBUtil.query(__db, _statement, false, null); try { final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id"); final int _cursorIndexOfCreateTime = CursorUtil.getColumnIndexOrThrow(_cursor, "createTime"); final int _cursorIndexOfDuration = CursorUtil.getColumnIndexOrThrow(_cursor, "duration"); final int _cursorIndexOfFileName = CursorUtil.getColumnIndexOrThrow(_cursor, "fileName"); final int _cursorIndexOfFilePath = CursorUtil.getColumnIndexOrThrow(_cursor, "filePath"); final List _result = new ArrayList(_cursor.getCount()); while(_cursor.moveTonext()) { final AudioRecordFile _item; final long _tmpId; _tmpId = _cursor.getLong(_cursorIndexOfId); final long _tmpCreateTime; _tmpCreateTime = _cursor.getLong(_cursorIndexOfCreateTime); final String _tmpDuration; if (_cursor.isNull(_cursorIndexOfDuration)) { _tmpDuration = null; } else { _tmpDuration = _cursor.getString(_cursorIndexOfDuration); } final String _tmpFileName; if (_cursor.isNull(_cursorIndexOfFileName)) { _tmpFileName = null; } else { _tmpFileName = _cursor.getString(_cursorIndexOfFileName); } final String _tmpFilePath; if (_cursor.isNull(_cursorIndexOfFilePath)) { _tmpFilePath = null; } else { _tmpFilePath = _cursor.getString(_cursorIndexOfFilePath); } _item = new AudioRecordFile(_tmpId,_tmpCreateTime,_tmpDuration,_tmpFileName,_tmpFilePath); _result.add(_item); } return _result; } finally { _cursor.close(); } } @Override protected void finalize() { _statement.release(); } }); }
以上代码中,最终重要的是__db.getInvalidationTracker().createLiveData()这一行,顾名思义,InvalidationTracker的意思是失效追踪器,所谓的失效应该就是新数据更新导致旧数据失效。
跟进去看下InvalidationTracker的相关源码。
InvalidationTracker
// Some details on how the InvalidationTracker works:
// * An in memory table is created with (table_id, invalidated) table_id is a hardcoded int from
// initialization, while invalidated is a boolean bit to indicate if the table has been invalidated.
// * ObservedTableTracker tracks list of tables we should be watching (e.g. adding triggers for).
// * Before each beginTransaction, RoomDatabase invokes InvalidationTracker to sync trigger states.
// * After each endTransaction, RoomDatabase invokes InvalidationTracker to refresh invalidated
// tables.
// * Each update (write operation) on one of the observed tables triggers an update into the
// memory table table, flipping the invalidated flag ON.
// * When multi-instance invalidation is turned on, MultiInstanceInvalidationClient will be created.
// It works as an Observer, and notifies other instances of table invalidation.
public class InvalidationTracker {
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
private static final String TABLE_ID_COLUMN_NAME = "table_id";
private static final String INVALIDATED_COLUMN_NAME = "invalidated";
private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
+ "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
+ INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";
@VisibleForTesting
static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
+ " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERe " + INVALIDATED_COLUMN_NAME + " = 1 ";
@VisibleForTesting
static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATe_TABLE_NAME
+ " WHERe " + INVALIDATED_COLUMN_NAME + " = 1;";
@NonNull
final HashMap mTableIdLookup;
final String[] mTableNames;
@NonNull
private Map> mViewTables;
@Nullable
AutoCloser mAutoCloser = null;
@SuppressWarnings("WeakerAccess")
final RoomDatabase mDatabase;
AtomicBoolean mPendingRefresh = new AtomicBoolean(false);
private volatile boolean mInitialized = false;
@SuppressWarnings("WeakerAccess")
volatile SupportSQLiteStatement mCleanupStatement;
private ObservedTableTracker mObservedTableTracker;
private final InvalidationLiveDataContainer mInvalidationLiveDataContainer;
// should be accessed with synchronization only.
@VisibleForTesting
@SuppressLint("RestrictedApi")
final SafeIterableMap mObserverMap = new SafeIterableMap<>();
private MultiInstanceInvalidationClient mMultiInstanceInvalidationClient;
@SuppressWarnings("WeakerAccess")
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
public InvalidationTracker(RoomDatabase database, String... tableNames) {
this(database, new HashMap(), Collections.>emptyMap(),
tableNames);
}
@SuppressWarnings("WeakerAccess")
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
public InvalidationTracker(RoomDatabase database, Map shadowTablesMap,
Map> viewTables, String... tableNames) {
mDatabase = database;
mObservedTableTracker = new ObservedTableTracker(tableNames.length);
mTableIdLookup = new HashMap<>();
mViewTables = viewTables;
mInvalidationLiveDataContainer = new InvalidationLiveDataContainer(mDatabase);
final int size = tableNames.length;
mTableNames = new String[size];
for (int id = 0; id < size; id++) {
final String tableName = tableNames[id].toLowerCase(Locale.US);
mTableIdLookup.put(tableName, id);
String shadowTableName = shadowTablesMap.get(tableNames[id]);
if (shadowTableName != null) {
mTableNames[id] = shadowTableName.toLowerCase(Locale.US);
} else {
mTableNames[id] = tableName;
}
}
// Adjust table id lookup for those tables whose shadow table is another already mapped
// table (e.g. external content fts tables).
for (Map.Entry shadowTableEntry : shadowTablesMap.entrySet()) {
String shadowTableName = shadowTableEntry.getValue().toLowerCase(Locale.US);
if (mTableIdLookup.containsKey(shadowTableName)) {
String tableName = shadowTableEntry.getKey().toLowerCase(Locale.US);
mTableIdLookup.put(tableName, mTableIdLookup.get(shadowTableName));
}
}
}
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("DROP TRIGGER IF EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
writableDb.execSQL(stringBuilder.toString());
}
}
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
writableDb.execSQL(
"INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN UPDATE ")
.append(UPDATE_TABLE_NAME)
.append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
.append(" WHERe ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
.append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
.append("; END");
writableDb.execSQL(stringBuilder.toString());
}
}
}
可以看到,定义了三个触发器,对应修改,删除,插入:
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
并且在startTrackingTable和stopTrackingTable,分别创建和删除这些触发器,验证了我们前面的想法。
但是,虽然有了触发器,但触发器的相关操作并不是去查询数据表,而是去更新了一个名为room_table_modification_log的表,这样的目的是什么?这个问题先卖个关子,我们继续往下看。
LiveData的使用
RoomTrackingLiveData对象
__db.getInvalidationTracker().createLiveData()代码中,createLiveData方法则最终调用InvalidationLiveDataContainer类的create的方法则返回了一个RoomTrackingLiveData对象。
class InvalidationLiveDataContainer {
@SuppressWarnings("WeakerAccess")
@VisibleForTesting
final Set mLiveDataSet = Collections.newSetFromMap(
new IdentityHashMap()
);
private final RoomDatabase mDatabase;
InvalidationLiveDataContainer(RoomDatabase database) {
mDatabase = database;
}
LiveData create(String[] tableNames, boolean inTransaction,
Callable computeFunction) {
return new RoomTrackingLiveData<>(mDatabase, this, inTransaction, computeFunction,
tableNames);
}
void onActive(LiveData liveData) {
mLiveDataSet.add(liveData);
}
void onInactive(LiveData liveData) {
mLiveDataSet.remove(liveData);
}
}
class InvalidationLiveDataContainer {
@SuppressWarnings("WeakerAccess")
@VisibleForTesting
final Set mLiveDataSet = Collections.newSetFromMap(
new IdentityHashMap()
);
private final RoomDatabase mDatabase;
InvalidationLiveDataContainer(RoomDatabase database) {
mDatabase = database;
}
LiveData create(String[] tableNames, boolean inTransaction,
Callable computeFunction) {
return new RoomTrackingLiveData<>(mDatabase, this, inTransaction, computeFunction,
tableNames);
}
void onActive(LiveData liveData) {
mLiveDataSet.add(liveData);
}
void onInactive(LiveData liveData) {
mLiveDataSet.remove(liveData);
}
}
@SuppressLint("RestrictedApi")
RoomTrackingLiveData(
RoomDatabase database,
InvalidationLiveDataContainer container,
boolean inTransaction,
Callable computeFunction,
String[] tableNames) {
mDatabase = database;
mInTransaction = inTransaction;
mComputeFunction = computeFunction;
mContainer = container;
mObserver = new InvalidationTracker.Observer(tableNames) {
@Override
public void onInvalidated(@NonNull Set tables) {
ArchTaskExecutor.getInstance().executeonMainThread(mInvalidationRunnable);
}
};
}
我们知道,更新LiveData的值,要么调用setValue,要么调用postValue,也就是说,触发器之后的操作需要更新最新的表数据时,就必须然要调用以上的setValue/postValue,顺着这个思路,我们去查看RoomTrackingLiveData的代码,发现mRefreshRunnable有调用postValue。
RoomTrackingLiveData.mRefreshRunnable
@SuppressWarnings("WeakerAccess")
final Runnable mRefreshRunnable = new Runnable() {
@WorkerThread
@Override
public void run() {
if (mRegisteredObserver.compareAndSet(false, true)) {
mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
}
boolean computed;
do {
computed = false;
// compute can happen only in 1 thread but no reason to lock others.
if (mComputing.compareAndSet(false, true)) {
// as long as it is invalid, keep computing.
try {
T value = null;
while (mInvalid.compareAndSet(true, false)) {
computed = true;
try {
value = mComputeFunction.call();
} catch (Exception e) {
throw new RuntimeException("Exception while computing database"
+ " live data.", e);
}
}
if (computed) {
postValue(value);
}
} finally {
// release compute lock
mComputing.set(false);
}
}
// check invalid after releasing compute lock to avoid the following scenario.
// Thread A runs compute()
// Thread A checks invalid, it is false
// Main thread sets invalid to true
// Thread B runs, fails to acquire compute lock and skips
// Thread A releases compute lock
// We've left invalid in set state. The check below recovers.
} while (computed && mInvalid.get());
}
};
先不管其它细枝末节,可以看到postValue的value是调用mComputeFunction.call();返回的,mComputeFunction则对应前面observeAudioRecordFileList源码中createLiveData的参数:
@Override public LiveDataobserveAudioRecordFileList() { final String _sql = "SELECt * FROM AudioRecordFile order by createTime DESC "; final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 0); return __db.getInvalidationTracker().createLiveData(new String[]{"AudioRecordFile"}, false, new Callable
() { @Override public List call() throws Exception { final Cursor _cursor = DBUtil.query(__db, _statement, false, null); try { final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id"); final int _cursorIndexOfCreateTime = CursorUtil.getColumnIndexOrThrow(_cursor, "createTime"); final int _cursorIndexOfDuration = CursorUtil.getColumnIndexOrThrow(_cursor, "duration"); final int _cursorIndexOfFileName = CursorUtil.getColumnIndexOrThrow(_cursor, "fileName"); final int _cursorIndexOfFilePath = CursorUtil.getColumnIndexOrThrow(_cursor, "filePath"); final List _result = new ArrayList(_cursor.getCount()); while(_cursor.moveTonext()) { final AudioRecordFile _item; final long _tmpId; _tmpId = _cursor.getLong(_cursorIndexOfId); final long _tmpCreateTime; _tmpCreateTime = _cursor.getLong(_cursorIndexOfCreateTime); final String _tmpDuration; if (_cursor.isNull(_cursorIndexOfDuration)) { _tmpDuration = null; } else { _tmpDuration = _cursor.getString(_cursorIndexOfDuration); } final String _tmpFileName; if (_cursor.isNull(_cursorIndexOfFileName)) { _tmpFileName = null; } else { _tmpFileName = _cursor.getString(_cursorIndexOfFileName); } final String _tmpFilePath; if (_cursor.isNull(_cursorIndexOfFilePath)) { _tmpFilePath = null; } else { _tmpFilePath = _cursor.getString(_cursorIndexOfFilePath); } _item = new AudioRecordFile(_tmpId,_tmpCreateTime,_tmpDuration,_tmpFileName,_tmpFilePath); _result.add(_item); } return _result; } finally { _cursor.close(); } } @Override protected void finalize() { _statement.release(); } });
可以看到value就是call方法的返回值,返回的是对应sql语句查询返回的数据。那剩下的关键就是,触发器的操作是怎么最终调用到mRefreshRunnable的。
继续看RoomTrackingLiveData源码,可以看到只有以下两个地方执行了mRefreshRunnable:
@SuppressWarnings("WeakerAccess")
final Runnable mInvalidationRunnable = new Runnable() {
@MainThread
@Override
public void run() {
boolean isActive = hasActiveObservers();
if (mInvalid.compareAndSet(false, true)) {
if (isActive) {
getQueryExecutor().execute(mRefreshRunnable);
}
}
}
};
@Override
protected void onActive() {
super.onActive();
mContainer.onActive(this);
getQueryExecutor().execute(mRefreshRunnable);
}
InvalidationTracker.Observer
onActive可以先不用管,我们看mInvalidationRunnable,发现是InvalidationTracker.Observer类中的onInvalidated方法中执行的mInvalidationRunnable:
RoomTrackingLiveData(
RoomDatabase database,
InvalidationLiveDataContainer container,
boolean inTransaction,
Callable computeFunction,
String[] tableNames) {
mDatabase = database;
mInTransaction = inTransaction;
mComputeFunction = computeFunction;
mContainer = container;
mObserver = new InvalidationTracker.Observer(tableNames) {
@Override
public void onInvalidated(@NonNull Set tables) {
ArchTaskExecutor.getInstance().executeonMainThread(mInvalidationRunnable);
}
};
}
而这个mObserver又被add到mDatabase的InvalidationTracker中:
if (mRegisteredObserver.compareAndSet(false, true)) {
mDatabase.getInvalidationTracker().addWeakObserver(mObserver);
}
最终put进了mObserverMap。
InvalidationTracker.addObserver
@SuppressLint("RestrictedApi")
@WorkerThread
public void addObserver(@NonNull Observer observer) {
final String[] tableNames = resolveViews(observer.mTables);
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
for (int i = 0; i < size; i++) {
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
}
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
syncTriggers();
}
}
这样一来,我们猜测触发器最终肯定直接或者间接的遍历mObserverMap,并调用对应对饮Observer的onInvalidated方法。
InvalidationTracker.mObserverMap
顺着思路继续继续查找遍历mObserverMap的代码,发现两个地方:
@RestrictTo(RestrictTo.Scope.LIBRARY)
@VisibleForTesting(otherwise = VisibleForTesting.PACKAGE_PRIVATE)
public void notifyObserversByTableNames(String... tables) {
synchronized (mObserverMap) {
for (Map.Entry entry : mObserverMap) {
if (!entry.getKey().isRemote()) {
entry.getValue().notifyByTableNames(tables);
}
}
}
}
根据注释,notifyObserversByTableNames主要是用于 InvalidationTracker 无法检测到数据表更新的场景,如其它进程更新了表数据,InvalidationTracker检测不到(为啥?触发器应该能触发吧?继续往下看吧),此时可以调用notifyObserversByTableNames强制遍历所有的Observer进行通知刷新。
notifyObserversByTableNames不是重点需要关心的,继续往下看。
InvalidationTracker.mRefreshRunnable
@VisibleForTesting
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
Set invalidatedTableIds = null;
closeLock.lock();
try {
if (!ensureInitialization()) {
return;
}
if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}
if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}
if (mDatabase.mWriteAheadLoggingEnabled) {
// This transaction has to be on the underlying DB rather than the RoomDatabase
// in order to avoid a recursive loop after endTransaction.
SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase();
db.beginTransactionNonExclusive();
try {
invalidatedTableIds = checkUpdatedTable();
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
} else {
invalidatedTableIds = checkUpdatedTable();
}
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
if (mAutoCloser != null) {
mAutoCloser.decrementCountAndScheduleClose();
}
}
if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
synchronized (mObserverMap) {
for (Map.Entry entry : mObserverMap) {
entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
}
}
}
}
private Set checkUpdatedTable() {
HashSet invalidatedTableIds = new HashSet<>();
Cursor cursor = mDatabase.query(new SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL));
//noinspection TryFinallyCanBeTryWithResources
try {
while (cursor.moveTonext()) {
final int tableId = cursor.getInt(0);
invalidatedTableIds.add(tableId);
}
} finally {
cursor.close();
}
if (!invalidatedTableIds.isEmpty()) {
mCleanupStatement.executeUpdateDelete();
}
return invalidatedTableIds;
}
};
mRefreshRunnable代码比较长,我们耐心看:
if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
synchronized (mObserverMap) {
for (Map.Entry entry : mObserverMap) {
entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
}
}
}
void notifyByTableInvalidStatus(Set invalidatedTablesIds) {
Set invalidatedTables = null;
final int size = mTableIds.length;
for (int index = 0; index < size; index++) {
final int tableId = mTableIds[index];
if (invalidatedTablesIds.contains(tableId)) {
if (size == 1) {
// Optimization for a single-table observer
invalidatedTables = mSingleTableSet;
} else {
if (invalidatedTables == null) {
invalidatedTables = new HashSet<>(size);
}
invalidatedTables.add(mTableNames[index]);
}
}
}
if (invalidatedTables != null) {
mObserver.onInvalidated(invalidatedTables);
}
}
可以看到mObserverMap遍历后会调用notifyByTableInvalidStatus方法,notifyByTableInvalidStatus方法会直接调用mObserver.onInvalidated(invalidatedTables),最终执行查询数据表的操作,并将返回值通过LiveData的postValue发出去。
前面我们提到room_table_modification_log的表,作用是什么?mRefreshRunnable告诉我们答案。
private SetcheckUpdatedTable() { HashSet invalidatedTableIds = new HashSet<>(); Cursor cursor = mDatabase.query(new SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)); //noinspection TryFinallyCanBeTryWithResources try { while (cursor.moveTonext()) { final int tableId = cursor.getInt(0); invalidatedTableIds.add(tableId); } } finally { cursor.close(); } if (!invalidatedTableIds.isEmpty()) { mCleanupStatement.executeUpdateDelete(); } return invalidatedTableIds; }
可以看到mRefreshRunnable会调用checkUpdatedTable方法,查询room_table_modification_log表中记录的发生变化的表得到invalidatedTablesIds,如果invalidatedTablesIds不为空,并且包含我们设置返回LiveData的表,则回调对应的Observer的onInvalidated方法。
接下来只需要搞清楚mRefreshRunnable什么时候会执行的即可。mRefreshRunnable有两个调用的地方,我们继续往下看。
refreshVersionsAsync
@SuppressWarnings("WeakerAccess")
public void refreshVersionsAsync() {
// TODO we should consider doing this sync instead of async.
if (mPendingRefresh.compareAndSet(false, true)) {
if (mAutoCloser != null) {
// refreshVersionsAsync is called with the ref count incremented from
// RoomDatabase, so the db can't be closed here, but we need to be sure that our
// db isn't closed until refresh is completed. This increment call must be
// matched with a corresponding call in mRefreshRunnable.
mAutoCloser.incrementCountAndEnsureDbIsOpen();
}
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
refreshVersionsAsync明显是一个异步方法,根据注释以及代码调用跟踪,该方法在endTransaction后调用:
@Deprecated
public void endTransaction() {
if (mAutoCloser == null) {
internalEndTransaction();
} else {
mAutoCloser.executeRefCountingFunction(db -> {
internalEndTransaction();
return null;
});
}
}
private void internalEndTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
以新增一个Entity为例:
@Override
public Object insertAudioRecordFile(final AudioRecordFile audioRecordFile,
final Continuation super Long> continuation) {
return CoroutinesRoom.execute(__db, true, new Callable() {
@Override
public Long call() throws Exception {
__db.beginTransaction();
try {
long _result = __insertionAdapterOfAudioRecordFile.insertAndReturnId(audioRecordFile);
__db.setTransactionSuccessful();
return _result;
} finally {
__db.endTransaction();
}
}
}, continuation);
}
当我们向数据库中插入一条数据时,不管成功或者失败,最终都会调用__db.endTransaction(),这样就会触发refreshVersionsAsync执行,注意这里的__db.endTransaction()方法是RoomDatabase对象的,不是SupportSQLiteDatabase对象的。
对于触发器来说,当执行insertAudioRecordFile插入语句的时候,在真正插入数据之前触发器就会执行相关操作修改room_table_modification_log表中的记录,记录发生变动的表的id和对应状态,等插入完成执行__db.endTransaction();操作时,再去查询room_table_modification_log表执行相关LiveData对象的postValue的业务逻辑。
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
writableDb.execSQL(
"INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN UPDATE ")
.append(UPDATE_TABLE_NAME)
.append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
.append(" WHERe ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
.append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
.append("; END");
writableDb.execSQL(stringBuilder.toString());
}
}
这样一来,完整的支持LiveData的观察数据库变化的功能实现思路就完全理通了。
refreshVersionsSync
第二个地方是refreshVersionsSync,明显是一个同步方法,目前只有LimitOffsetDataSource会调用,鉴于LimitOffsetDataSource没有实际使用过,就不分析了。
@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP_PREFIX)
@WorkerThread
public void refreshVersionsSync() {
if (mAutoCloser != null) {
// This increment call must be matched with a corresponding call in mRefreshRunnable.
mAutoCloser.incrementCountAndEnsureDbIsOpen();
}
syncTriggers();
mRefreshRunnable.run();
}
distinctUntilChanged
我们注意到,ViewModel中返回Room的LiveData时,还有个distinctUntilChanged方法,这是一个扩展函数,源码如下
@MainThread @NonNull public staticLiveData distinctUntilChanged(@NonNull LiveData source) { final MediatorLiveData outputLiveData = new MediatorLiveData<>(); outputLiveData.addSource(source, new Observer () { boolean mFirstTime = true; @Override public void onChanged(X currentValue) { final X previousValue = outputLiveData.getValue(); if (mFirstTime || (previousValue == null && currentValue != null) || (previousValue != null && !previousValue.equals(currentValue))) { mFirstTime = false; outputLiveData.setValue(currentValue); } } }); return outputLiveData; }
可以看到distinctUntilChanged使用MediatorLiveData返回一个新的LiveData对象,并且利用onChanged方法,在第一次或Old LiveData对象更新的value与当前New LiveData对象的值不同时,才调用New LiveData对象的setValue方法。
小结
Room除了支持LiveData,还支持Flow,RxJava,但实现观察数据库的功能本质上都是一样的,都是使用触发器实现,这里画几张图加深理解。
图1
图1表示,View去观察Room返回的LiveData,LiveData的值是XXX对应的数据表。
图2
图3
图2和图3表示触发器的创建(包含移除)过程。
图4
图4表示,执行数据库增删改等SQL时,会先触发触发器的相关操作(主要是在room_table_modification_log表中记录发生"UPDATE", "DELETE", "INSERT"操作的数据表的id和状态),待数据库事务结束后,查询room_table_modification_log中是否存在发生变动的表,如果有,则回调对应Observer的onInvaliDated方法,执行对应的mComputeFunction(XXXDao_Impl的createLiveData方法传入的computeFunction,一般是查询表数据),并将mComputeFunction.call()返回的结果通过LiveData的postValue更新。
2022年02月08日补充为避免误导大家,摘抄了Android官方最新的LiveData文档以供参考:
您可能会想在数据层类中使用 LiveData 对象,但 LiveData 并不适合用于处理异步数据流。虽然您可以使用 LiveData 转换和 MediatorLiveData 来实现此目的,但此方法的缺点在于:用于组合数据流的功能非常有限,并且所有 LiveData 对象(包括通过转换创建的对象)都会在主线程中观察到。
如果您需要在应用的其他层中使用数据流,请考虑使用 Kotlin Flow,然后使用 asLiveData() 在 ViewModel 中将 Kotlin Flow 转换成 LiveData。如需详细了解如何搭配使用 Kotlin Flow 与 LiveData,请学习此 Codelab。对于使用 Java 构建的代码库,请考虑将执行器与回调或 RxJava 结合使用。
LiveData 概览 | Android 开发者 | Android Developers
可以看到,官方不再推荐在界面层之外的地方使用LiveData, Java用户建议将执行器与回调或 RxJava 结合使用, Kotlin 用户请考虑使用 Kotlin Flow 。
当时还在Google的JakeWharton 大神也不推荐LiveData配合Room使用,因为 LiveData 无法处理查询过程中产生的异常,而且按照大神的说法,他自己Never used LiveData。类似还有Retrofit配合LiveData的使用场景,也不推荐这么干!
github.com/cashapp/sql…
LiveData 具有生命周期感知能力,遵循 activity 和 fragment 等实体的生命周期。我们可以使用 LiveData 在这些生命周期所有者和生命周期不同的其他对象(例如 ViewModel 对象)之间传递数据。
ViewModel 的主要责任是加载和管理与界面相关的数据,因此非常适合作为用于保留 LiveData 对象的备选方法。我们可以在 ViewModel 中创建 LiveData 对象,然后使用这些对象向界面层公开状态。
activity 和 fragment 不应保留 LiveData 实例,因为它们的用途是显示数据,而不是保持状态。
综上所述,笔者只建议在ViewModel使用LiveData,然而,你有可能使用了基于LiveData实现的事件总线框架:比如SingleLiveEvent,LiveDataBus等,或者干脆在后台线程中调用用 MutableLiveData.postValue() 与界面层通信,无所谓了,生命在于折腾,只要你知其然知其所以然,LiveData用的舒服又有什么关系呢?
当然,有小伙伴已经在ViewModel上用上StateFlow了,那你很牛逼,不过LiveData和StateFlow不是二选一的,选择适合你的即可。
写在最后,首先非常感谢您耐心阅读完整篇文章,坚持写原创且基于实战的文章不是件容易的事,如果本文刚好对您有点帮助,欢迎您给文章点赞评论,您的鼓励是笔者坚持不懈的动力。若文章有不对之处也欢迎指正,再次感谢。



