小巧。快速。可靠。
三选二。

使用 sqlite3_unlock_notify() API

/* This example uses the pthreads API */
#include <pthread.h>

/*
** A pointer to an instance of this structure is passed as the user-context
** pointer when registering for an unlock-notify callback.
*/
typedef struct UnlockNotification UnlockNotification;
struct UnlockNotification {
  int fired;                         /* True after unlock event has occurred */
  pthread_cond_t cond;               /* Condition variable to wait on */
  pthread_mutex_t mutex;             /* Mutex to protect structure */
};

/*
** This function is an unlock-notify callback registered with SQLite.
*/
static void unlock_notify_cb(void **apArg, int nArg){
  int i;
  for(i=0; i<nArg; i++){
    UnlockNotification *p = (UnlockNotification *)apArg[i];
    pthread_mutex_lock(&p->mutex);
    p->fired = 1;
    pthread_cond_signal(&p->cond);
    pthread_mutex_unlock(&p->mutex);
  }
}

/*
** This function assumes that an SQLite API call (either sqlite3_prepare_v2() 
** or sqlite3_step()) has just returned SQLITE_LOCKED. The argument is the
** associated database connection.
**
** This function calls sqlite3_unlock_notify() to register for an 
** unlock-notify callback, then blocks until that callback is delivered 
** and returns SQLITE_OK. The caller should then retry the failed operation.
**
** Or, if sqlite3_unlock_notify() indicates that to block would deadlock 
** the system, then this function returns SQLITE_LOCKED immediately. In 
** this case the caller should not retry the operation and should roll 
** back the current transaction (if any).
*/
static int wait_for_unlock_notify(sqlite3 *db){
  int rc;
  UnlockNotification un;

  /* Initialize the UnlockNotification structure. */
  un.fired = 0;
  pthread_mutex_init(&un.mutex, 0);
  pthread_cond_init(&un.cond, 0);

  /* Register for an unlock-notify callback. */
  rc = sqlite3_unlock_notify(db, unlock_notify_cb, (void *)&un);
  assert( rc==SQLITE_LOCKED || rc==SQLITE_OK );

  /* The call to sqlite3_unlock_notify() always returns either SQLITE_LOCKED 
  ** or SQLITE_OK. 
  **
  ** If SQLITE_LOCKED was returned, then the system is deadlocked. In this
  ** case this function needs to return SQLITE_LOCKED to the caller so 
  ** that the current transaction can be rolled back. Otherwise, block
  ** until the unlock-notify callback is invoked, then return SQLITE_OK.
  */
  if( rc==SQLITE_OK ){
    pthread_mutex_lock(&un.mutex);
    if( !un.fired ){
      pthread_cond_wait(&un.cond, &un.mutex);
    }
    pthread_mutex_unlock(&un.mutex);
  }

  /* Destroy the mutex and condition variables. */
  pthread_cond_destroy(&un.cond);
  pthread_mutex_destroy(&un.mutex);

  return rc;
}

/*
** This function is a wrapper around the SQLite function sqlite3_step().
** It functions in the same way as step(), except that if a required
** shared-cache lock cannot be obtained, this function may block waiting for
** the lock to become available. In this scenario the normal API step()
** function always returns SQLITE_LOCKED.
**
** If this function returns SQLITE_LOCKED, the caller should rollback
** the current transaction (if any) and try again later. Otherwise, the
** system may become deadlocked.
*/
int sqlite3_blocking_step(sqlite3_stmt *pStmt){
  int rc;
  while( SQLITE_LOCKED==(rc = sqlite3_step(pStmt)) ){
    rc = wait_for_unlock_notify(sqlite3_db_handle(pStmt));
    if( rc!=SQLITE_OK ) break;
    sqlite3_reset(pStmt);
  }
  return rc;
}

/*
** This function is a wrapper around the SQLite function sqlite3_prepare_v2().
** It functions in the same way as prepare_v2(), except that if a required
** shared-cache lock cannot be obtained, this function may block waiting for
** the lock to become available. In this scenario the normal API prepare_v2()
** function always returns SQLITE_LOCKED.
**
** If this function returns SQLITE_LOCKED, the caller should rollback
** the current transaction (if any) and try again later. Otherwise, the
** system may become deadlocked.
*/
int sqlite3_blocking_prepare_v2(
  sqlite3 *db,              /* Database handle. */
  const char *zSql,         /* UTF-8 encoded SQL statement. */
  int nSql,                 /* Length of zSql in bytes. */
  sqlite3_stmt **ppStmt,    /* OUT: A pointer to the prepared statement */
  const char **pz           /* OUT: End of parsed string */
){
  int rc;
  while( SQLITE_LOCKED==(rc = sqlite3_prepare_v2(db, zSql, nSql, ppStmt, pz)) ){
    rc = wait_for_unlock_notify(db);
    if( rc!=SQLITE_OK ) break;
  }
  return rc;
}

当两个或多个连接以共享缓存模式访问同一个数据库时,会对各个表使用读写(共享和独占)锁,以确保并发执行的事务保持隔离。在写入表之前,必须获取该表的写(独占)锁。在读取之前,必须获取读(共享)锁。连接在完成其事务时释放所有持有的表锁。如果连接无法获取所需的锁,则对 sqlite3_step() 的调用将返回 SQLITE_LOCKED。

虽然不太常见,但如果 sqlite3_prepare()sqlite3_prepare_v2() 无法获取每个附加数据库的 sqlite_schema 表 的读锁,也可能返回 SQLITE_LOCKED。这些 API 需要读取 sqlite_schema 表中包含的模式数据,才能将 SQL 语句编译成 sqlite3_stmt* 对象。

本文介绍了一种使用 SQLite sqlite3_unlock_notify() 接口的技术,使得对 sqlite3_step()sqlite3_prepare_v2() 的调用会阻塞,直到所需的锁可用,而不是立即返回 SQLITE_LOCKED。如果左侧提供的 sqlite3_blocking_step() 或 sqlite3_blocking_prepare_v2() 函数返回 SQLITE_LOCKED,则表示阻塞会导致系统死锁。

sqlite3_unlock_notify() API 仅在库使用预处理器符号 SQLITE_ENABLE_UNLOCK_NOTIFY 定义进行编译时才可用,此处 有其文档。本文不能替代阅读完整的 API 文档!

sqlite3_unlock_notify() 接口设计用于在为每个 数据库连接 分配了单独线程的系统中使用。实现中没有任何内容可以阻止单个线程运行多个数据库连接。但是, sqlite3_unlock_notify() 接口一次只能在一个连接上工作,因此此处提供的锁解析逻辑仅适用于每个线程一个数据库连接。

sqlite3_unlock_notify() API

在对 sqlite3_step()sqlite3_prepare_v2() 的调用返回 SQLITE_LOCKED 后,可以调用 sqlite3_unlock_notify() API 来注册解锁通知回调。在阻止对 sqlite3_step()sqlite3_prepare_v2() 的调用成功的数据库连接持有表锁并完成其事务并释放所有锁后,SQLite 会调用解锁通知回调。例如,如果对 sqlite3_step() 的调用是尝试读取表 X,并且其他一些连接 Y 持有表 X 的写锁,则 sqlite3_step() 将返回 SQLITE_LOCKED。如果随后调用 sqlite3_unlock_notify(),则在连接 Y 的事务结束之后会调用解锁通知回调。解锁通知回调正在等待的连接(在本例中为连接 Y)称为“阻塞连接”。

如果尝试写入数据库表的 sqlite3_step() 调用返回 SQLITE_LOCKED,则可能有多个其他连接持有相关数据库表的读锁。在这种情况下,SQLite 只是任意选择其中一个连接,并在该连接的事务完成后发出解锁通知回调。无论 sqlite3_step() 调用是被一个连接还是多个连接阻塞,当发出相应的解锁通知回调时,都不能保证所需的锁可用,只能保证它可能可用。

发出解锁通知回调时,是在与阻塞连接关联的 sqlite3_step()(或 sqlite3_close())调用中发出的。在解锁通知回调中调用任何 sqlite3_XXX() API 函数都是非法的。预期用途是解锁通知回调将向其他等待线程发出信号或安排稍后执行某些操作。

sqlite3_blocking_step() 函数使用的算法如下

  1. 在提供的语句句柄上调用 sqlite3_step()。如果调用返回除 SQLITE_LOCKED 之外的任何值,则将此值返回给调用方。否则,继续。

  2. 在与提供的语句句柄关联的数据库连接句柄上调用 sqlite3_unlock_notify() 以注册解锁通知回调。如果对 unlock_notify() 的调用返回 SQLITE_LOCKED,则将此值返回给调用方。

  3. 阻塞,直到另一个线程调用解锁通知回调。

  4. 在语句句柄上调用 sqlite3_reset()。由于 SQLITE_LOCKED 错误可能只发生在对 sqlite3_step() 的第一次调用中(一次 sqlite3_step() 调用不可能返回 SQLITE_ROW,然后下一次返回 SQLITE_LOCKED),因此可以在此时重置语句句柄,而不会影响查询结果从调用方的角度来看。如果此时没有调用 sqlite3_reset(),则对 sqlite3_step() 的下一次调用将返回 SQLITE_MISUSE。

  5. 返回步骤 1。

sqlite3_blocking_prepare_v2() 函数使用的算法类似,只是省略了步骤 4(重置语句句柄)。

写者饥饿

多个连接可以同时持有读锁。如果许多线程正在获取重叠的读锁,则可能至少有一个线程始终持有读锁。然后,等待写锁的表将永远等待。这种情况称为“写者饥饿”。

SQLite 帮助应用程序避免写者饥饿。在任何获取表写锁的尝试失败后(因为一个或多个其他连接持有读锁),所有尝试在共享缓存上打开新事务都会失败,直到以下之一为真

尝试打开新的读事务失败会向调用方返回 SQLITE_LOCKED。如果调用方随后调用 sqlite3_unlock_notify() 以注册解锁通知回调,则阻塞连接是当前在共享缓存上具有打开写事务的连接。这可以防止写者饥饿,因为如果无法打开新的读事务,并且假设所有现有读事务最终都会结束,则写者最终将有机会获取所需的写锁。

pthreads API

当 wait_for_unlock_notify() 调用 sqlite3_unlock_notify() 时,阻止 sqlite3_step() 或 sqlite3_prepare_v2() 调用成功的阻塞连接可能已经完成了其事务。在这种情况下,解锁通知回调会立即调用,在 sqlite3_unlock_notify() 返回之前。或者,解锁通知回调可能在调用 sqlite3_unlock_notify() 后但在线程开始等待异步信号之前由第二个线程调用。

如何处理这种潜在的竞争条件完全取决于应用程序使用的线程和同步原语接口。此示例使用 pthreads,这是现代类 Unix 系统(包括 Linux)提供的接口。

pthreads 接口提供 pthread_cond_wait() 函数。此函数允许调用方同时释放互斥锁并开始等待异步信号。使用此函数、一个“已触发”标志和一个互斥锁,可以消除上述竞争条件,如下所示

当调用解锁通知回调时(可能在调用 sqlite3_unlock_notify() 的线程开始等待异步信号之前),它会执行以下操作

  1. 获取互斥锁。
  2. 将“已触发”标志设置为 true。
  3. 尝试向等待线程发送信号。
  4. 释放互斥锁。

当 wait_for_unlock_notify() 线程准备好开始等待解锁通知回调到达时,它会

  1. 获取互斥锁。
  2. 检查“已触发”标志是否已设置。如果是,则解锁通知回调已调用。释放互斥锁并继续。
  3. 原子地释放互斥锁并开始等待异步信号。信号到达时,继续。

这样,无论解锁通知回调是否已调用或正在调用,当 wait_for_unlock_notify() 线程开始阻塞时都没有关系。

可能的增强功能

本文中的代码至少可以通过两种方式改进

即使 sqlite3_unlock_notify() 函数仅允许调用方指定单个用户上下文指针,解锁通知回调也会传递一个此类上下文指针数组。这是因为,如果当阻塞连接完成其事务时,如果有多个解锁通知注册调用相同的 C 函数,则上下文指针会被编组到一个数组中并发出一个回调。如果为每个线程分配一个优先级,那么与其像此实现一样以任意顺序发送线程信号,不如先发送高优先级线程的信号,然后再发送低优先级线程的信号。

如果执行“DROP TABLE”或“DROP INDEX” SQL 命令,并且同一个数据库连接当前有一个或多个正在执行的 SELECT 语句,则会返回 SQLITE_LOCKED。如果在这种情况下调用 sqlite3_unlock_notify(),则会立即调用指定的回调。重新尝试“DROP TABLE”或“DROP INDEX”语句将返回另一个 SQLITE_LOCKED 错误。在左侧显示的 sqlite3_blocking_step() 的实现中,这可能会导致无限循环。

调用方可以通过使用 扩展错误代码 来区分此特殊“DROP TABLE|INDEX”情况和其他情况。当调用 sqlite3_unlock_notify() 时,扩展错误代码为 SQLITE_LOCKED_SHAREDCACHE。否则,在“DROP TABLE|INDEX”情况下,它只是普通的 SQLITE_LOCKED。另一种解决方案可能是限制任何单个查询可以重试的次数(例如 100 次)。虽然这可能不如人们希望的那样高效,但相关情况不太可能经常发生。

此页面上次修改于 2022-01-08 05:02:57 UTC