栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

mysql源码分析——插件访问的观察者模式

mysql源码分析——插件访问的观察者模式

一、观察者模式

什么是观察者模式?就是有一个家伙在默默的看着你,当你状态到了一定地步就开始搞一些事情,这些事情可能对你有利,也可能不利。当然,一头羊是赶,多头羊也是放,所以一般观察者模式应用于一对比的模式。就好像一个教师观察者一个班的学生一样。任何一个学生违反纪律,教师自动就会把他拎出来“慰问”一下。如果教师不在,但是教师指定了班长或者其它人可以动用“慰问”这个行动,那么能产生同样的结果。明白了吧。
观察者模式是一种松耦合的方式,是一种面向接口编程的应用模式,不知道面向接口编程,那就翻一下书,没啥,实现一个接口而已。知道了什么是观察者模式,那么在MySql中实现插件设计的一种方式就是这种观察者模式,下面就通过具体的代码分析一下。

二、Plugin相关数据结构

前面已经讲过了插件架构和handlerton插件应用模式,本篇分析一下观察者模式的插件加载。先看一下几个核心的数据结构:

//sql/rpl_handler.h .cc
class Observer_info {
 public:
  void *observer;
  st_plugin_int *plugin_int;
  plugin_ref plugin;

  Observer_info(void *ob, st_plugin_int *p);
};


class Delegate {
 public:
  typedef List Observer_info_list;
  typedef List_iterator Observer_info_iterator;

  
  explicit Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
      PSI_rwlock_key key
#endif
  );
  
  virtual ~Delegate();

  
  int add_observer(void *observer, st_plugin_int *plugin);
  
  int remove_observer(void *observer);
  
  Observer_info_iterator observer_info_iter();
  
  bool is_empty();
  
  int read_lock();
  
  int write_lock();
  
  int unlock();
  
  bool is_inited();
  
  void update_lock_type();
  
  void update_plugin_ref_count();
  
  bool use_rw_lock_type();
  
  bool use_spin_lock_type();

 private:
  
  Observer_info_list observer_info_list;
  
  mysql_rwlock_t lock;
  
  lock::Shared_spin_lock m_spin_lock;
  
  MEM_ROOT memroot;
  
  bool inited;
  
  std::atomic m_configured_lock_type;
  
  std::atomic m_acquired_locks;
  
  std::map m_acquired_references;

  enum enum_delegate_lock_type {
    DELEGATE_OS_LOCK = -1,   // Lock used by this class is an OS RW lock
    DELEGATE_SPIN_LOCK = 1,  // Lock used by this class is a spin lock
  };

  enum enum_delegate_lock_mode {
    DELEGATE_LOCK_MODE_SHARED = 0,     // Lock acquired in shared/read mode
    DELEGATE_LOCK_MODE_EXCLUSIVE = 1,  // Lock acquired in exclusive/write mode
  };

  
  void acquire_plugin_ref_count(Observer_info *info);
  
  void lock_it(enum_delegate_lock_mode mode);
};

#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Trans_delegate_lock;
#endif

class Binlog_cache_storage;

class Trans_delegate : public Delegate {
 public:
  Trans_delegate()
      : Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
            key_rwlock_Trans_delegate_lock
#endif
        ) {
  }

  typedef Trans_observer Observer;

  int before_dml(THD *thd, int &result);
  int before_commit(THD *thd, bool all, Binlog_cache_storage *trx_cache_log,
                    Binlog_cache_storage *stmt_cache_log,
                    ulonglong cache_log_max_size, bool is_atomic_ddl);
  int before_rollback(THD *thd, bool all);
  int after_commit(THD *thd, bool all);
  int after_rollback(THD *thd, bool all);
  int trans_begin(THD *thd, int &result);
};

#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Server_state_delegate_lock;
#endif

class Server_state_delegate : public Delegate {
 public:
  Server_state_delegate()
      : Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
            key_rwlock_Server_state_delegate_lock
#endif
        ) {
  }

  typedef Server_state_observer Observer;
  int before_handle_connection(THD *thd);
  int before_recovery(THD *thd);
  int after_engine_recovery(THD *thd);
  int after_recovery(THD *thd);
  int before_server_shutdown(THD *thd);
  int after_server_shutdown(THD *thd);
  int after_dd_upgrade_from_57(THD *thd);
};

#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock;
#endif

class Binlog_storage_delegate : public Delegate {
 public:
  Binlog_storage_delegate()
      : Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
            key_rwlock_Binlog_storage_delegate_lock
#endif
        ) {
  }

  typedef Binlog_storage_observer Observer;
  int after_flush(THD *thd, const char *log_file, my_off_t log_pos);
  int after_sync(THD *thd, const char *log_file, my_off_t log_pos);
};

#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock;
#endif

class Binlog_transmit_delegate : public Delegate {
 public:
  Binlog_transmit_delegate()
      : Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
            key_rwlock_Binlog_transmit_delegate_lock
#endif
        ) {
  }

  typedef Binlog_transmit_observer Observer;
  int transmit_start(THD *thd, ushort flags, const char *log_file,
                     my_off_t log_pos, bool *observe_transmission);
  int transmit_stop(THD *thd, ushort flags);
  int reserve_header(THD *thd, ushort flags, String *packet);
  int before_send_event(THD *thd, ushort flags, String *packet,
                        const char *log_file, my_off_t log_pos);
  int after_send_event(THD *thd, ushort flags, String *packet,
                       const char *skipped_log_file, my_off_t skipped_log_pos);
  int after_reset_master(THD *thd, ushort flags);
};

#ifdef HAVE_PSI_RWLOCK_INTERFACE
extern PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock;
#endif

class Binlog_relay_IO_delegate : public Delegate {
 public:
  Binlog_relay_IO_delegate()
      : Delegate(
#ifdef HAVE_PSI_RWLOCK_INTERFACE
            key_rwlock_Binlog_relay_IO_delegate_lock
#endif
        ) {
  }

  typedef Binlog_relay_IO_observer Observer;
  int thread_start(THD *thd, Master_info *mi);
  int thread_stop(THD *thd, Master_info *mi);
  int applier_start(THD *thd, Master_info *mi);
  int applier_stop(THD *thd, Master_info *mi, bool aborted);
  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
  int after_read_event(THD *thd, Master_info *mi, const char *packet, ulong len,
                       const char **event_buf, ulong *event_len);
  int after_queue_event(THD *thd, Master_info *mi, const char *event_buf,
                        ulong event_len, bool synced);
  int after_reset_slave(THD *thd, Master_info *mi);
  int applier_log_event(THD *thd, int &out);

 private:
  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};

int delegates_init();
void delegates_destroy();

void delegates_acquire_locks();

void delegates_release_locks();

void delegates_update_lock_type();

extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
extern Server_state_delegate *server_state_delegate;
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;


#define RUN_HOOK(group, hook, args) 
  (group##_delegate->is_empty() ? 0 : group##_delegate->hook args)

#define NO_HOOK(group) (group##_delegate->is_empty())

int launch_hook_trans_begin(THD *thd, TABLE_LIST *table);

#endif 

除了最开始的两个外部变量定义没有拷贝上来,其它都拷贝上来了。别看这两么多,其实很简单,核心就两个类,一个观察者类Observer_info,另外一个观察者事件或者说行为的处理基类。到这里其实就很清晰了,也就是说,观察者方式主要就是通过一个观察者类来监视行动,根据不同的行为来产生具体的代理类(事件类)对象并对事件进行处理。这也是一般新语言的一个事件处理的流程,通过代理或者委托来实际处理事件的运作。
有些人可能不明白,简单解释一下,以前处理行动一般是一个(多个)函数,而在更抽象的机制上是对整个行动事件抽象封装成一个类。这样再看两个类就很清楚了,一定会是你中有我,我中有你,然后是一连中的指针对象或者列表。
在事件对象中,又实现了五个子类:
//sql/replication.h
Trans_delegate:事物相关,主要在事物逻辑中应用。它包含:before_dml,before_commit,before_rollback,after_commit,after_rollback,trans_begin等。
Server_state_delegate:服务状态事件相关,用于监视服务状态启停等处。它主要有before_handle_connection,before_recovery,after_engine_recovery,after_recovery,
before_server_shutdown ,after_server_shutdown。
Binlog_storage_delegate:存储事件相关,主要用于日志同步。它主要有after_flush,after_sync。
Binlog_transmit_delegate:传输事件相关,主要用于传输节点间事件。它主要有:transmit_start,transmit_stop,reserve_header,before_send_event,after_send_event,after_reset_master。
Binlog_relay_IO_delegate:Relay IO事件相关,主要用于主从复制。它主要有:thread_start,thread_stop,applier_start,applier_stop,before_request_transmit,after_read_event,
after_queue_event ,after_reset_slave,applier_log_event
这几个具体的结构的定义在replication.h中,包含有说明中的相关函数指针。而相关函数指针的定义在也这个文件中,只举其中一部分代码来看:

typedef int (*after_flush_t)(Binlog_storage_param *param, const char *log_file,
                             my_off_t log_pos);
typedef int (*after_sync_t)(Binlog_storage_param *param, const char *log_file,
                            my_off_t log_pos);


typedef struct Binlog_storage_observer {
  uint32 len;

  after_flush_t after_flush;
  after_sync_t after_sync;
} Binlog_storage_observer;


typedef struct Binlog_transmit_param {
  uint32 server_id;
  uint32 flags;
  
  static const uint32 F_OBSERVE = 1;
  static const uint32 F_DONT_OBSERVE = 2;

  void set_observe_flag() { flags |= F_OBSERVE; }
  void set_dont_observe_flag() { flags |= F_DONT_OBSERVE; }
  
  bool should_observe() {
    return (flags & F_OBSERVE) || !(flags & F_DONT_OBSERVE);
  }

更多细节代码,可以参看相关的文件,这里就不再一一指出。

四、Plugin的应用基本流程

在明白数据结构以后,基本上如何应用,应该大体的思路就有了。但具体到怎么应用,还得看Mysql中有什么更合适的用法。那首先插件第一件事一定是注册,只有注册到相关的函数指针,具体的插件才会被调用:

//sql/rpl_handler.cc
int register_trans_observer(Trans_observer *observer, void *p) {
  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
}

int unregister_trans_observer(Trans_observer *observer, void *) {
  return transaction_delegate->remove_observer(observer);
}

int register_binlog_storage_observer(Binlog_storage_observer *observer,
                                     void *p) {
  DBUG_TRACE;
  int result =
      binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
  return result;
}

int unregister_binlog_storage_observer(Binlog_storage_observer *observer,
                                       void *) {
  return binlog_storage_delegate->remove_observer(observer);
}

int register_server_state_observer(Server_state_observer *observer,
                                   void *plugin_var) {
  DBUG_TRACE;
  int result = server_state_delegate->add_observer(observer,
                                                   (st_plugin_int *)plugin_var);
  return result;
}

int unregister_server_state_observer(Server_state_observer *observer, void *) {
  DBUG_TRACE;
  int result = server_state_delegate->remove_observer(observer);
  return result;
}

int register_binlog_transmit_observer(Binlog_transmit_observer *observer,
                                      void *p) {
  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
}

int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer,
                                        void *) {
  return binlog_transmit_delegate->remove_observer(observer);
}

int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
                                      void *p) {
  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
}

int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
                                        void *) {
  return binlog_relay_io_delegate->remove_observer(observer);
}

正如前面所讲其实都是调用了add_observer:

int Delegate::add_observer(void *observer, st_plugin_int *plugin) {
  int ret = false;
  if (!inited) return true;
  write_lock();
  Observer_info_iterator iter(observer_info_list);
  Observer_info *info = iter++;
  while (info && info->observer != observer) info = iter++;
  if (!info) {
    info = new Observer_info(observer, plugin);
    if (!info || observer_info_list.push_back(info, &memroot))
      ret = true;
    else if (this->use_spin_lock_type())
      acquire_plugin_ref_count(info);
  } else
    ret = true;
  unlock();
  return ret;
}

int Delegate::remove_observer(void *observer) {
  int ret = false;
  if (!inited) return true;
  write_lock();
  Observer_info_iterator iter(observer_info_list);
  Observer_info *info = iter++;
  while (info && info->observer != observer) info = iter++;
  if (info) {
    iter.remove();
    delete info;
  } else
    ret = true;
  unlock();
  return ret;
}

在文件中有几个宏需要看一下:

//rpl_handler.h

#define RUN_HOOK(group, hook, args) 
  (group##_delegate->is_empty() ? 0 : group##_delegate->hook args)

#define NO_HOOK(group) (group##_delegate->is_empty())
//rpl_handler.cc

#define FOREACH_OBSERVER(r, f, args)                                   
  Prealloced_array plugins(PSI_NOT_INSTRUMENTED);       
  read_lock();                                                         
  Observer_info_iterator iter = observer_info_iter();                  
  Observer_info *info = iter++;                                        
  bool replication_optimize_for_static_plugin_config =                 
      this->use_spin_lock_type();                                      
  for (; info; info = iter++) {                                        
    plugin_ref plugin = (replication_optimize_for_static_plugin_config 
                             ? info->plugin                            
                             : my_plugin_lock(0, &info->plugin));      
    if (!plugin) {                                                     
        
      r = 0;                                                           
      break;                                                           
    }                                                                  
    if (!replication_optimize_for_static_plugin_config)                
      plugins.push_back(plugin);                                       
    if (((Observer *)info->observer)->f &&                             
        ((Observer *)info->observer)->f args) {                        
      r = 1;                                                           
      LogEvent()                                                       
          .prio(ERROR_LEVEL)                                           
          .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED)                      
          .subsys(LOG_SUBSYSTEM_TAG)                                   
          .function(#f)                                                
          .message("Run function '" #f "' in plugin '%s' failed",      
                   info->plugin_int->name.str);                        
      break;                                                           
    }                                                                  
  }                                                                    
  unlock();                                                            
                                                                     
  if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());

#define FOREACH_OBSERVER_ERROR_OUT(r, f, args, out)                    
  Prealloced_array plugins(PSI_NOT_INSTRUMENTED);       
  read_lock();                                                         
  Observer_info_iterator iter = observer_info_iter();                  
  Observer_info *info = iter++;                                        
                                                                       
  bool replication_optimize_for_static_plugin_config =                 
      this->use_spin_lock_type();                                      
  int error_out = 0;                                                   
  for (; info; info = iter++) {                                        
    plugin_ref plugin = (replication_optimize_for_static_plugin_config 
                             ? info->plugin                            
                             : my_plugin_lock(0, &info->plugin));      
    if (!plugin) {                                                     
        
      r = 0;                                                           
      break;                                                           
    }                                                                  
    if (!replication_optimize_for_static_plugin_config)                
      plugins.push_back(plugin);                                       
                                                                       
    bool hook_error = false;                                           
    hook_error = ((Observer *)info->observer)->f(args, error_out);     
                                                                       
    out += error_out;                                                  
    if (hook_error) {                                                  
      r = 1;                                                           
      LogEvent()                                                       
          .prio(ERROR_LEVEL)                                           
          .errcode(ER_RPL_PLUGIN_FUNCTION_FAILED)                      
          .subsys(LOG_SUBSYSTEM_TAG)                                   
          .function(#f)                                                
          .message("Run function '" #f "' in plugin '%s' failed",      
                   info->plugin_int->name.str);                        
      break;                                                           
    }                                                                  
  }                                                                    
  unlock();                                                            
                                                                     
  if (!plugins.empty()) plugin_unlock_list(0, &plugins[0], plugins.size());

看名字都很好理解,一个是具体调用函数指针,一个是遍历,最后一个是异常处理。

注册以后就可以调用了,看下面的代码:
首先要初始化等相关:

int delegates_init() {
  alignas(Trans_delegate) static char place_trans_mem[sizeof(Trans_delegate)];
  alignas(Binlog_storage_delegate) static char
      place_storage_mem[sizeof(Binlog_storage_delegate)];
  alignas(Server_state_delegate) static char
      place_state_mem[sizeof(Server_state_delegate)];
  alignas(Binlog_transmit_delegate) static char
      place_transmit_mem[sizeof(Binlog_transmit_delegate)];
  alignas(Binlog_relay_IO_delegate) static char
      place_relay_io_mem[sizeof(Binlog_relay_IO_delegate)];

  transaction_delegate = new (place_trans_mem) Trans_delegate;
  if (!transaction_delegate->is_inited()) {
    LogErr(ERROR_LEVEL, ER_RPL_TRX_DELEGATES_INIT_FAILED);
    return 1;
  }

  binlog_storage_delegate = new (place_storage_mem) Binlog_storage_delegate;
  if (!binlog_storage_delegate->is_inited()) {
    LogErr(ERROR_LEVEL, ER_RPL_BINLOG_STORAGE_DELEGATES_INIT_FAILED);
    return 1;
  }

  server_state_delegate = new (place_state_mem) Server_state_delegate;
  binlog_transmit_delegate = new (place_transmit_mem) Binlog_transmit_delegate;
  if (!binlog_transmit_delegate->is_inited()) {
    LogErr(ERROR_LEVEL, ER_RPL_BINLOG_TRANSMIT_DELEGATES_INIT_FAILED);
    return 1;
  }

  binlog_relay_io_delegate = new (place_relay_io_mem) Binlog_relay_IO_delegate;
  if (!binlog_relay_io_delegate->is_inited()) {
    LogErr(ERROR_LEVEL, ER_RPL_BINLOG_RELAY_DELEGATES_INIT_FAILED);
    return 1;
  }

  return 0;
}

void delegates_destroy() {
  if (transaction_delegate) transaction_delegate->~Trans_delegate();
  if (binlog_storage_delegate)
    binlog_storage_delegate->~Binlog_storage_delegate();
  if (server_state_delegate) server_state_delegate->~Server_state_delegate();
  if (binlog_transmit_delegate)
    binlog_transmit_delegate->~Binlog_transmit_delegate();
  if (binlog_relay_io_delegate)
    binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
}

static void delegates_update_plugin_ref_count() {
  if (transaction_delegate) transaction_delegate->update_plugin_ref_count();
  if (binlog_storage_delegate)
    binlog_storage_delegate->update_plugin_ref_count();
  if (server_state_delegate) server_state_delegate->update_plugin_ref_count();
#ifdef HAVE_REPLICATION
  if (binlog_transmit_delegate)
    binlog_transmit_delegate->update_plugin_ref_count();
  if (binlog_relay_io_delegate)
    binlog_relay_io_delegate->update_plugin_ref_count();
#endif 
}

void delegates_acquire_locks() {
  if (transaction_delegate) transaction_delegate->write_lock();
  if (binlog_storage_delegate) binlog_storage_delegate->write_lock();
  if (server_state_delegate) server_state_delegate->write_lock();
#ifdef HAVE_REPLICATION
  if (binlog_transmit_delegate) binlog_transmit_delegate->write_lock();
  if (binlog_relay_io_delegate) binlog_relay_io_delegate->write_lock();
#endif 
}

void delegates_release_locks() {
  if (transaction_delegate) transaction_delegate->unlock();
  if (binlog_storage_delegate) binlog_storage_delegate->unlock();
  if (server_state_delegate) server_state_delegate->unlock();
#ifdef HAVE_REPLICATION
  if (binlog_transmit_delegate) binlog_transmit_delegate->unlock();
  if (binlog_relay_io_delegate) binlog_relay_io_delegate->unlock();
#endif 
}

void delegates_update_lock_type() {
  delegates_update_plugin_ref_count();

  if (transaction_delegate) transaction_delegate->update_lock_type();
  if (binlog_storage_delegate) binlog_storage_delegate->update_lock_type();
  if (server_state_delegate) server_state_delegate->update_lock_type();
#ifdef HAVE_REPLICATION
  if (binlog_transmit_delegate) binlog_transmit_delegate->update_lock_type();
  if (binlog_relay_io_delegate) binlog_relay_io_delegate->update_lock_type();
#endif 
}

然后看一个应用代码来分析一下流程:

static int request_dump(THD *thd, MYSQL *mysql, MYSQL_RPL *rpl, Master_info *mi,
                        bool *suppress_warnings) {
  DBUG_TRACE;
  enum_server_command command =
      mi->is_auto_position() ? COM_BINLOG_DUMP_GTID : COM_BINLOG_DUMP;
  
  uint binlog_flags = 0;
  //看一下这个RUN_HOOK
  *suppress_warnings = false;
  if (RUN_HOOK(binlog_relay_io, before_request_transmit,
               (thd, mi, binlog_flags)))
    return 1;
  ......
}

RUN_HOOK展开在上面已经有了,其实就是首先判断观察者队列是否为空,否则直接调用binlog_relay_io->before_request_transmit函数。即:

int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, Master_info *mi,
                                                      ushort flags) {
  Binlog_relay_IO_param param;
  init_param(¶m, mi);
  param.server_id = thd->server_id;
  param.thread_id = thd->thread_id();

  int ret = 0;
  FOREACH_OBSERVER(ret, before_request_transmit, (¶m, (uint32)flags));
  return ret;
}

其实就绪一下步骤就是:
1、建立观察者事件对象delegate(根据实际应用的五种之一)用于注册。同时将函数指针赋值。
2、创建观察者对象并指向事件对象。
3、调用注册函数将相关Observer_infoPush到注册列表
4、在关键点调用RUN_HOOK.
5、通过宏FOREACH_OBSERVER遍历相关函数
设计应用还是和插件配合的相当默契,值得学习。

五、总结

学以致用,这才是学习的目的,看别人的代码,最终还是要学习别人的长处,吸收引用,加以消化整理,为已所用。
加油吧,归来的少年!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/300562.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号