什么是观察者模式?就是有一个家伙在默默的看着你,当你状态到了一定地步就开始搞一些事情,这些事情可能对你有利,也可能不利。当然,一头羊是赶,多头羊也是放,所以一般观察者模式应用于一对比的模式。就好像一个教师观察者一个班的学生一样。任何一个学生违反纪律,教师自动就会把他拎出来“慰问”一下。如果教师不在,但是教师指定了班长或者其它人可以动用“慰问”这个行动,那么能产生同样的结果。明白了吧。
观察者模式是一种松耦合的方式,是一种面向接口编程的应用模式,不知道面向接口编程,那就翻一下书,没啥,实现一个接口而已。知道了什么是观察者模式,那么在MySql中实现插件设计的一种方式就是这种观察者模式,下面就通过具体的代码分析一下。
前面已经讲过了插件架构和handlerton插件应用模式,本篇分析一下观察者模式的插件加载。先看一下几个核心的数据结构:
//sql/rpl_handler.h .cc
class Observer_info {
public:
void *observer;
st_plugin_int *plugin_int;
plugin_ref plugin;
Observer_info(void *ob, st_plugin_int *p);
};
class Delegate {
public:
typedef List Observer_info_list;
typedef List_iterator Observer_info_iterator;
explicit Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
PSI_rwlock_key key
#endif
);
virtual ~Delegate();
int add_observer(void *observer, st_plugin_int *plugin);
int remove_observer(void *observer);
Observer_info_iterator observer_info_iter();
bool is_empty();
int read_lock();
int write_lock();
int unlock();
bool is_inited();
void update_lock_type();
void update_plugin_ref_count();
bool use_rw_lock_type();
bool use_spin_lock_type();
private:
Observer_info_list observer_info_list;
mysql_rwlock_t lock;
lock::Shared_spin_lock m_spin_lock;
MEM_ROOT memroot;
bool inited;
std::atomic m_configured_lock_type;
std::atomic m_acquired_locks;
std::map m_acquired_references;
enum enum_delegate_lock_type {
DELEGATE_OS_LOCK = -1, // Lock used by this class is an OS RW lock
DELEGATE_SPIN_LOCK = 1, // Lock used by this class is a spin lock
};
enum enum_delegate_lock_mode {
DELEGATE_LOCK_MODE_SHARED = 0, // Lock acquired in shared/read mode
DELEGATE_LOCK_MODE_EXCLUSIVE = 1, // Lock acquired in exclusive/write mode
};
void acquire_plugin_ref_count(Observer_info *info);
void lock_it(enum_delegate_lock_mode mode);
};
#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Trans_delegate_lock;
#endif
class Binlog_cache_storage;
class Trans_delegate : public Delegate {
public:
Trans_delegate()
: Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
key_rwlock_Trans_delegate_lock
#endif
) {
}
typedef Trans_observer Observer;
int before_dml(THD *thd, int &result);
int before_commit(THD *thd, bool all, Binlog_cache_storage *trx_cache_log,
Binlog_cache_storage *stmt_cache_log,
ulonglong cache_log_max_size, bool is_atomic_ddl);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
int trans_begin(THD *thd, int &result);
};
#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Server_state_delegate_lock;
#endif
class Server_state_delegate : public Delegate {
public:
Server_state_delegate()
: Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
key_rwlock_Server_state_delegate_lock
#endif
) {
}
typedef Server_state_observer Observer;
int before_handle_connection(THD *thd);
int before_recovery(THD *thd);
int after_engine_recovery(THD *thd);
int after_recovery(THD *thd);
int before_server_shutdown(THD *thd);
int after_server_shutdown(THD *thd);
int after_dd_upgrade_from_57(THD *thd);
};
#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock;
#endif
class Binlog_storage_delegate : public Delegate {
public:
Binlog_storage_delegate()
: Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
key_rwlock_Binlog_storage_delegate_lock
#endif
) {
}
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file, my_off_t log_pos);
int after_sync(THD *thd, const char *log_file, my_off_t log_pos);
};
#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock;
#endif
class Binlog_transmit_delegate : public Delegate {
public:
Binlog_transmit_delegate()
: Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
key_rwlock_Binlog_transmit_delegate_lock
#endif
) {
}
typedef Binlog_transmit_observer Observer;
int transmit_start(THD *thd, ushort flags, const char *log_file,
my_off_t log_pos, bool *observe_transmission);
int transmit_stop(THD *thd, ushort flags);
int reserve_header(THD *thd, ushort flags, String *packet);
int before_send_event(THD *thd, ushort flags, String *packet,
const char *log_file, my_off_t log_pos);
int after_send_event(THD *thd, ushort flags, String *packet,
const char *skipped_log_file, my_off_t skipped_log_pos);
int after_reset_master(THD *thd, ushort flags);
};
#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock;
#endif
class Binlog_relay_IO_delegate : public Delegate {
public:
Binlog_relay_IO_delegate()
: Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
key_rwlock_Binlog_relay_IO_delegate_lock
#endif
) {
}
typedef Binlog_relay_IO_observer Observer;
int thread_start(THD *thd, Master_info *mi);
int thread_stop(THD *thd, Master_info *mi);
int applier_start(THD *thd, Master_info *mi);
int applier_stop(THD *thd, Master_info *mi, bool aborted);
int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
int after_read_event(THD *thd, Master_info *mi, const char *packet, ulong len,
const char **event_buf, ulong *event_len);
int after_queue_event(THD *thd, Master_info *mi, const char *event_buf,
ulong event_len, bool synced);
int after_reset_slave(THD *thd, Master_info *mi);
int applier_log_event(THD *thd, int &out);
private:
void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
int delegates_init();
void delegates_destroy();
void delegates_acquire_locks();
void delegates_release_locks();
void delegates_update_lock_type();
extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
extern Server_state_delegate *server_state_delegate;
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#define RUN_HOOK(group, hook, args)
(group##_delegate->is_empty() ? 0 : group##_delegate->hook args)
#define NO_HOOK(group) (group##_delegate->is_empty())
int launch_hook_trans_begin(THD *thd, TABLE_LIST *table);
#endif
除了最开始的两个外部变量定义没有拷贝上来,其它都拷贝上来了。别看这两么多,其实很简单,核心就两个类,一个观察者类Observer_info,另外一个观察者事件或者说行为的处理基类。到这里其实就很清晰了,也就是说,观察者方式主要就是通过一个观察者类来监视行动,根据不同的行为来产生具体的代理类(事件类)对象并对事件进行处理。这也是一般新语言的一个事件处理的流程,通过代理或者委托来实际处理事件的运作。
有些人可能不明白,简单解释一下,以前处理行动一般是一个(多个)函数,而在更抽象的机制上是对整个行动事件抽象封装成一个类。这样再看两个类就很清楚了,一定会是你中有我,我中有你,然后是一连中的指针对象或者列表。
在事件对象中,又实现了五个子类:
//sql/replication.h
Trans_delegate:事物相关,主要在事物逻辑中应用。它包含:before_dml,before_commit,before_rollback,after_commit,after_rollback,trans_begin等。
Server_state_delegate:服务状态事件相关,用于监视服务状态启停等处。它主要有before_handle_connection,before_recovery,after_engine_recovery,after_recovery,
before_server_shutdown ,after_server_shutdown。
Binlog_storage_delegate:存储事件相关,主要用于日志同步。它主要有after_flush,after_sync。
Binlog_transmit_delegate:传输事件相关,主要用于传输节点间事件。它主要有:transmit_start,transmit_stop,reserve_header,before_send_event,after_send_event,after_reset_master。
Binlog_relay_IO_delegate:Relay IO事件相关,主要用于主从复制。它主要有:thread_start,thread_stop,applier_start,applier_stop,before_request_transmit,after_read_event,
after_queue_event ,after_reset_slave,applier_log_event
这几个具体的结构的定义在replication.h中,包含有说明中的相关函数指针。而相关函数指针的定义在也这个文件中,只举其中一部分代码来看:
typedef int (*after_flush_t)(Binlog_storage_param *param, const char *log_file,
my_off_t log_pos);
typedef int (*after_sync_t)(Binlog_storage_param *param, const char *log_file,
my_off_t log_pos);
typedef struct Binlog_storage_observer {
uint32 len;
after_flush_t after_flush;
after_sync_t after_sync;
} Binlog_storage_observer;
typedef struct Binlog_transmit_param {
uint32 server_id;
uint32 flags;
static const uint32 F_OBSERVE = 1;
static const uint32 F_DONT_OBSERVE = 2;
void set_observe_flag() { flags |= F_OBSERVE; }
void set_dont_observe_flag() { flags |= F_DONT_OBSERVE; }
bool should_observe() {
return (flags & F_OBSERVE) || !(flags & F_DONT_OBSERVE);
}
更多细节代码,可以参看相关的文件,这里就不再一一指出。
四、Plugin的应用基本流程在明白数据结构以后,基本上如何应用,应该大体的思路就有了。但具体到怎么应用,还得看Mysql中有什么更合适的用法。那首先插件第一件事一定是注册,只有注册到相关的函数指针,具体的插件才会被调用:
//sql/rpl_handler.cc
int register_trans_observer(Trans_observer *observer, void *p) {
return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_trans_observer(Trans_observer *observer, void *) {
return transaction_delegate->remove_observer(observer);
}
int register_binlog_storage_observer(Binlog_storage_observer *observer,
void *p) {
DBUG_TRACE;
int result =
binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
return result;
}
int unregister_binlog_storage_observer(Binlog_storage_observer *observer,
void *) {
return binlog_storage_delegate->remove_observer(observer);
}
int register_server_state_observer(Server_state_observer *observer,
void *plugin_var) {
DBUG_TRACE;
int result = server_state_delegate->add_observer(observer,
(st_plugin_int *)plugin_var);
return result;
}
int unregister_server_state_observer(Server_state_observer *observer, void *) {
DBUG_TRACE;
int result = server_state_delegate->remove_observer(observer);
return result;
}
int register_binlog_transmit_observer(Binlog_transmit_observer *observer,
void *p) {
return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer,
void *) {
return binlog_transmit_delegate->remove_observer(observer);
}
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
void *p) {
return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
void *) {
return binlog_relay_io_delegate->remove_observer(observer);
}
正如前面所讲其实都是调用了add_observer:
int Delegate::add_observer(void *observer, st_plugin_int *plugin) {
int ret = false;
if (!inited) return true;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info = iter++;
while (info && info->observer != observer) info = iter++;
if (!info) {
info = new Observer_info(observer, plugin);
if (!info || observer_info_list.push_back(info, &memroot))
ret = true;
else if (this->use_spin_lock_type())
acquire_plugin_ref_count(info);
} else
ret = true;
unlock();
return ret;
}
int Delegate::remove_observer(void *observer) {
int ret = false;
if (!inited) return true;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info = iter++;
while (info && info->observer != observer) info = iter++;
if (info) {
iter.remove();
delete info;
} else
ret = true;
unlock();
return ret;
}
在文件中有几个宏需要看一下:
//rpl_handler.h #define RUN_HOOK(group, hook, args) (group##_delegate->is_empty() ? 0 : group##_delegate->hook args) #define NO_HOOK(group) (group##_delegate->is_empty()) //rpl_handler.cc #define FOREACH_OBSERVER(r, f, args) Prealloced_arrayplugins(PSI_NOT_INSTRUMENTED); read_lock(); Observer_info_iterator iter = observer_info_iter(); Observer_info *info = iter++; bool replication_optimize_for_static_plugin_config = this->use_spin_lock_type(); for (; info; info = iter++) { plugin_ref plugin = (replication_optimize_for_static_plugin_config ? info->plugin : my_plugin_lock(0, &info->plugin)); if (!plugin) { r = 0; break; } if (!replication_optimize_for_static_plugin_config) plugins.push_back(plugin); if (((Observer *)info->observer)->f && ((Observer *)info->observer)->f args) { r = 1; LogEvent() .prio(ERROR_LEVEL) .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED) .subsys(LOG_SUBSYSTEM_TAG) .function(#f) .message("Run function '" #f "' in plugin '%s' failed", info->plugin_int->name.str); break; } } unlock(); if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size()); #define FOREACH_OBSERVER_ERROR_OUT(r, f, args, out) Prealloced_array plugins(PSI_NOT_INSTRUMENTED); read_lock(); Observer_info_iterator iter = observer_info_iter(); Observer_info *info = iter++; bool replication_optimize_for_static_plugin_config = this->use_spin_lock_type(); int error_out = 0; for (; info; info = iter++) { plugin_ref plugin = (replication_optimize_for_static_plugin_config ? info->plugin : my_plugin_lock(0, &info->plugin)); if (!plugin) { r = 0; break; } if (!replication_optimize_for_static_plugin_config) plugins.push_back(plugin); bool hook_error = false; hook_error = ((Observer *)info->observer)->f(args, error_out); out += error_out; if (hook_error) { r = 1; LogEvent() .prio(ERROR_LEVEL) .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED) .subsys(LOG_SUBSYSTEM_TAG) .function(#f) .message("Run function '" #f "' in plugin '%s' failed", info->plugin_int->name.str); break; } } unlock(); if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());
看名字都很好理解,一个是具体调用函数指针,一个是遍历,最后一个是异常处理。
注册以后就可以调用了,看下面的代码:
首先要初始化等相关:
int delegates_init() {
alignas(Trans_delegate) static char place_trans_mem[sizeof(Trans_delegate)];
alignas(Binlog_storage_delegate) static char
place_storage_mem[sizeof(Binlog_storage_delegate)];
alignas(Server_state_delegate) static char
place_state_mem[sizeof(Server_state_delegate)];
alignas(Binlog_transmit_delegate) static char
place_transmit_mem[sizeof(Binlog_transmit_delegate)];
alignas(Binlog_relay_IO_delegate) static char
place_relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
transaction_delegate = new (place_trans_mem) Trans_delegate;
if (!transaction_delegate->is_inited()) {
LogErr(ERROR_LEVEL, ER_RPL_TRX_DELEGATES_INIT_FAILED);
return 1;
}
binlog_storage_delegate = new (place_storage_mem) Binlog_storage_delegate;
if (!binlog_storage_delegate->is_inited()) {
LogErr(ERROR_LEVEL, ER_RPL_BINLOG_STORAGE_DELEGATES_INIT_FAILED);
return 1;
}
server_state_delegate = new (place_state_mem) Server_state_delegate;
binlog_transmit_delegate = new (place_transmit_mem) Binlog_transmit_delegate;
if (!binlog_transmit_delegate->is_inited()) {
LogErr(ERROR_LEVEL, ER_RPL_BINLOG_TRANSMIT_DELEGATES_INIT_FAILED);
return 1;
}
binlog_relay_io_delegate = new (place_relay_io_mem) Binlog_relay_IO_delegate;
if (!binlog_relay_io_delegate->is_inited()) {
LogErr(ERROR_LEVEL, ER_RPL_BINLOG_RELAY_DELEGATES_INIT_FAILED);
return 1;
}
return 0;
}
void delegates_destroy() {
if (transaction_delegate) transaction_delegate->~Trans_delegate();
if (binlog_storage_delegate)
binlog_storage_delegate->~Binlog_storage_delegate();
if (server_state_delegate) server_state_delegate->~Server_state_delegate();
if (binlog_transmit_delegate)
binlog_transmit_delegate->~Binlog_transmit_delegate();
if (binlog_relay_io_delegate)
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
}
static void delegates_update_plugin_ref_count() {
if (transaction_delegate) transaction_delegate->update_plugin_ref_count();
if (binlog_storage_delegate)
binlog_storage_delegate->update_plugin_ref_count();
if (server_state_delegate) server_state_delegate->update_plugin_ref_count();
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate)
binlog_transmit_delegate->update_plugin_ref_count();
if (binlog_relay_io_delegate)
binlog_relay_io_delegate->update_plugin_ref_count();
#endif
}
void delegates_acquire_locks() {
if (transaction_delegate) transaction_delegate->write_lock();
if (binlog_storage_delegate) binlog_storage_delegate->write_lock();
if (server_state_delegate) server_state_delegate->write_lock();
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate) binlog_transmit_delegate->write_lock();
if (binlog_relay_io_delegate) binlog_relay_io_delegate->write_lock();
#endif
}
void delegates_release_locks() {
if (transaction_delegate) transaction_delegate->unlock();
if (binlog_storage_delegate) binlog_storage_delegate->unlock();
if (server_state_delegate) server_state_delegate->unlock();
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate) binlog_transmit_delegate->unlock();
if (binlog_relay_io_delegate) binlog_relay_io_delegate->unlock();
#endif
}
void delegates_update_lock_type() {
delegates_update_plugin_ref_count();
if (transaction_delegate) transaction_delegate->update_lock_type();
if (binlog_storage_delegate) binlog_storage_delegate->update_lock_type();
if (server_state_delegate) server_state_delegate->update_lock_type();
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate) binlog_transmit_delegate->update_lock_type();
if (binlog_relay_io_delegate) binlog_relay_io_delegate->update_lock_type();
#endif
}
然后看一个应用代码来分析一下流程:
static int request_dump(THD *thd, MYSQL *mysql, MYSQL_RPL *rpl, Master_info *mi,
bool *suppress_warnings) {
DBUG_TRACE;
enum_server_command command =
mi->is_auto_position() ? COM_BINLOG_DUMP_GTID : COM_BINLOG_DUMP;
uint binlog_flags = 0;
//看一下这个RUN_HOOK
*suppress_warnings = false;
if (RUN_HOOK(binlog_relay_io, before_request_transmit,
(thd, mi, binlog_flags)))
return 1;
......
}
RUN_HOOK展开在上面已经有了,其实就是首先判断观察者队列是否为空,否则直接调用binlog_relay_io->before_request_transmit函数。即:
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, Master_info *mi,
ushort flags) {
Binlog_relay_IO_param param;
init_param(¶m, mi);
param.server_id = thd->server_id;
param.thread_id = thd->thread_id();
int ret = 0;
FOREACH_OBSERVER(ret, before_request_transmit, (¶m, (uint32)flags));
return ret;
}
其实就绪一下步骤就是:
1、建立观察者事件对象delegate(根据实际应用的五种之一)用于注册。同时将函数指针赋值。
2、创建观察者对象并指向事件对象。
3、调用注册函数将相关Observer_infoPush到注册列表
4、在关键点调用RUN_HOOK.
5、通过宏FOREACH_OBSERVER遍历相关函数
设计应用还是和插件配合的相当默契,值得学习。
学以致用,这才是学习的目的,看别人的代码,最终还是要学习别人的长处,吸收引用,加以消化整理,为已所用。
加油吧,归来的少年!



