青鱼292 2023-10-30 16:56 采纳率: 89.3%
浏览 7
已结题

在Patroni高可用流程中,如何生成PostgreSQL集群,请分析健康和不健康的集群流程

在Patroni高可用流程中,如何生成PostgreSQL集群,请分析健康和不健康的集群流程

  • 写回答

1条回答

  • 谐云 谐云官方账号 2023-10-30 17:38
    关注

    启动PostgreSQL

    # is data directory empty?
    if self.state_handler.data_directory_empty():
        self.state_handler.set_role('uninitialized')
        self.state_handler.stop('immediate', stop_timeout=self.patroni.config['retry_timeout'])
        # In case datadir went away while we were master.
        self.watchdog.disable()
    
        # is this instance the leader?
        if self.has_lock():
            self.release_leader_key_voluntarily()
            return 'released leader key voluntarily as data dir empty and currently leader'
    
        if self.is_paused():
            return 'running with empty data directory'
        return self.bootstrap()  # new node
    else:
        # check if we are allowed to join
        data_sysid = self.state_handler.sysid
        if not self.sysid_valid(data_sysid):
            # data directory is not empty, but no valid sysid, cluster must be broken, suggest reinit
            return ("data dir for the cluster is not empty, "
                    "but system ID is invalid; consider doing reinitialize")
    
        if self.sysid_valid(self.cluster.initialize):
            if self.cluster.initialize != data_sysid:
                if self.is_paused():
                    logger.warning('system ID has changed while in paused mode. Patroni will exit when resuming'
                                   ' unless system ID is reset: %s != %s', self.cluster.initialize, data_sysid)
                    if self.has_lock():
                        self.release_leader_key_voluntarily()
                        return 'released leader key voluntarily due to the system ID mismatch'
                else:
                    logger.fatal('system ID mismatch, node %s belongs to a different cluster: %s != %s',
                                 self.state_handler.name, self.cluster.initialize, data_sysid)
                    sys.exit(1)
        elif self.cluster.is_unlocked() and not self.is_paused():
            # "bootstrap", but data directory is not empty
            if not self.state_handler.cb_called and self.state_handler.is_running() \
                    and not self.state_handler.is_leader():
                self._join_aborted = True
                logger.error('No initialize key in DCS and PostgreSQL is running as replica, aborting start')
                logger.error('Please first start Patroni on the node running as master')
                sys.exit(1)
            self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=data_sysid)
    
    

    无数据目录启动
    无数据目录启动,是指在执行初始化目录异常、恢复节点异常、WAL拉齐异常等场景下,会触发的流程:
    1.设置角色,用于后续重新初始化集群;
    2.立即停止当前PostgreSQL进程;
    3.判断当前节点是否为主节点,主动释放主节点锁;
    4.执行启动操作。

    def bootstrap(self):
      if not self.cluster.is_unlocked():  # cluster already has leader
          clone_member = self.cluster.get_clone_member(self.state_handler.name)
          member_role = 'leader' if clone_member == self.cluster.leader else 'replica'
          msg = "from {0} '{1}'".format(member_role, clone_member.name)
          ret = self._async_executor.try_run_async('bootstrap {0}'.format(msg), self.clone, args=(clone_member, msg))
          return ret or 'trying to bootstrap {0}'.format(msg)
      
      # no initialize key and node is allowed to be master and has 'bootstrap' section in a configuration file
      elif self.cluster.initialize is None and not self.patroni.nofailover and 'bootstrap' in self.patroni.config:
          if self.dcs.initialize(create_new=True):  # race for initialization
              self.state_handler.bootstrapping = True
              with self._async_response:
                  self._async_response.reset()
      
              if self.is_standby_cluster():
                  ret = self._async_executor.try_run_async('bootstrap_standby_leader', self.bootstrap_standby_leader)
                  return ret or 'trying to bootstrap a new standby leader'
              else:
                  ret = self._async_executor.try_run_async('bootstrap', self.state_handler.bootstrap.bootstrap,
                                                           args=(self.patroni.config['bootstrap'],))
                  return ret or 'trying to bootstrap a new cluster'
          else:
              return 'failed to acquire initialize lock'
      else:
          create_replica_methods = self.get_standby_cluster_config().get('create_replica_methods', []) \
                                   if self.is_standby_cluster() else None
          if self.state_handler.can_create_replica_without_replication_connection(create_replica_methods):
              msg = 'bootstrap (without leader)'
              return self._async_executor.try_run_async(msg, self.clone) or 'trying to ' + msg
          return 'waiting for {0}leader to bootstrap'.format('standby_' if self.is_standby_cluster() else '')
    

    上述代码,表示启动的几种方式:
    1.当前集群已有leader节点,当前PostgreSQL将以从节点从主节点上同步数据启动;
    2.当前集群没有leader节点,当前PostgreSQL将以主节点启动,如果是备用集群,将以备用集群主节点启动;
    3.当前集群为备用集群且没有主节点,从节点通过方式,一般通过协议流方式从主集群上进行数据同步。
    有数据目录启动
    有数据目录启动,主要校验集群ID与PostgreSQL节点sysid的一致性,触发的主要流程:
    1.校验PostgreSQL节点sysid是否有效,如果无效,表示PostgreSQL出现了异常需要重启;
    2.校验校验集群ID与PostgreSQL节点sysid是否一致,不一致将无法加入集群,如果集群已暂停,将会释放leader锁占用;
    3.检验集群没有leader节点,当前节点将重新初始化集群,将sysid作为新的集群ID启动。
    生成PostgreSQL集群

    
    try:
        if self.cluster.is_unlocked():
            ret = self.process_unhealthy_cluster()
        else:
            msg = self.process_healthy_cluster()
            ret = self.evaluate_scheduled_restart() or msg
    finally:
        # we might not have a valid PostgreSQL connection here if another thread
        # stops PostgreSQL, therefore, we only reload replication slots if no
        # asynchronous processes are running (should be always the case for the master)
        if not self._async_executor.busy and not self.state_handler.is_starting():
            create_slots = self.state_handler.slots_handler.sync_replication_slots(self.cluster,
                                                                                   self.patroni.nofailover)
            if not self.state_handler.cb_called:
                if not self.state_handler.is_leader():
                    self._rewind.trigger_check_diverged_lsn()
                self.state_handler.call_nowait(ACTION_ON_START)
            if create_slots and self.cluster.leader:
                err = self._async_executor.try_run_async('copy_logical_slots',
                                                         self.state_handler.slots_handler.copy_logical_slots,
                                                         args=(self.cluster.leader, create_slots))
                if not err:
                    ret = 'Copying logical slots {0} from the primary'.format(create_slots)
    

    生成PostgreSQL集群,主要根据当前集群是否存在主节点,判断走健康的集群流程还是非健康的集群流程。
    非健康的集群流程

    def process_unhealthy_cluster(self):
      """Cluster has no leader key"""
    
      if self.is_healthiest_node():
          if self.acquire_lock():
              failover = self.cluster.failover
              if failover:
                  if self.is_paused() and failover.leader and failover.candidate:
                      logger.info('Updating failover key after acquiring leader lock...')
                      self.dcs.manual_failover('', failover.candidate, failover.scheduled_at, failover.index)
                  else:
                      logger.info('Cleaning up failover key after acquiring leader lock...')
                      self.dcs.manual_failover('', '')
              self.load_cluster_from_dcs()
    
              if self.is_standby_cluster():
                  # standby leader disappeared, and this is the healthiest
                  # replica, so it should become a new standby leader.
                  # This implies we need to start following a remote master
                  msg = 'promoted self to a standby leader by acquiring session lock'
                  return self.enforce_follow_remote_master(msg)
              else:
                  return self.enforce_master_role(
                      'acquired session lock as a leader',
                      'promoted self to leader by acquiring session lock'
                  )
          else:
              return self.follow('demoted self after trying and failing to obtain lock',
                                 'following new leader after trying and failing to obtain lock')
      else:
          # when we are doing manual failover there is no guaranty that new leader is ahead of any other node
          # node tagged as nofailover can be ahead of the new leader either, but it is always excluded from elections
          if bool(self.cluster.failover) or self.patroni.nofailover:
              self._rewind.trigger_check_diverged_lsn()
              time.sleep(2)  # Give a time to somebody to take the leader lock
    
          if self.patroni.nofailover:
              return self.follow('demoting self because I am not allowed to become master',
                                 'following a different leader because I am not allowed to promote')
          return self.follow('demoting self because i am not the healthiest node',
                             'following a different leader because i am not the healthiest node')
    
    

    非健康的集群流程,是确定leader节点的候选,首要条件必须找到一个健康的节点,如何判断健康的节点,主要有以下几个条件:
    1.PostgreSQL集群状态非暂停;
    2.PostgreSQL节点状态非启动中;
    3.PostgreSQL节点允许故障转移;
    PostgreSQL节点WAL与集群缓存中的(最后一次主节点同步的lsn值)的滞后量在允许的范围内。

     def is_healthiest_node(self):
            if time.time() - self._released_leader_key_timestamp < self.dcs.ttl:
                logger.info('backoff: skip leader race after pre_promote script failure and releasing the lock voluntarily')
                return False
    
            if self.is_paused() and not self.patroni.nofailover and \
                    self.cluster.failover and not self.cluster.failover.scheduled_at:
                ret = self.manual_failover_process_no_leader()
                if ret is not None:  # continue if we just deleted the stale failover key as a master
                    return ret
    
            if self.state_handler.is_starting():  # postgresql still starting up is unhealthy
                return False
    
            if self.state_handler.is_leader():
                # in pause leader is the healthiest only when no initialize or sysid matches with initialize!
                return not self.is_paused() or not self.cluster.initialize\
                        or self.state_handler.sysid == self.cluster.initialize
    
            if self.is_paused():
                return False
    
            if self.patroni.nofailover:  # nofailover tag makes node always unhealthy
                return False
    
            if self.cluster.failover:
                # When doing a switchover in synchronous mode only synchronous nodes and former leader are allowed to race
                if self.is_synchronous_mode() and self.cluster.failover.leader and \
                        self.cluster.failover.candidate and not self.cluster.sync.matches(self.state_handler.name):
                    return False
                return self.manual_failover_process_no_leader()
    
            if not self.watchdog.is_healthy:
                logger.warning('Watchdog device is not usable')
                return False
    
            # When in sync mode, only last known master and sync standby are allowed to promote automatically.
            all_known_members = self.cluster.members + self.old_cluster.members
            if self.is_synchronous_mode() and self.cluster.sync and self.cluster.sync.leader:
                if not self.cluster.sync.matches(self.state_handler.name):
                    return False
                # pick between synchronous candidates so we minimize unnecessary failovers/demotions
                members = {m.name: m for m in all_known_members if self.cluster.sync.matches(m.name)}
            else:
                # run usual health check
                members = {m.name: m for m in all_known_members}
    
            return self._is_healthiest_node(members.values())
    
    
    ......
    
       def _is_healthiest_node(self, members, check_replication_lag=True):
            """This method tries to determine whether I am healthy enough to became a new leader candidate or not."""
    
            my_wal_position = self.state_handler.last_operation()
            if check_replication_lag and self.is_lagging(my_wal_position):
                logger.info('My wal position exceeds maximum replication lag')
                return False  # Too far behind last reported wal position on master
    
            if not self.is_standby_cluster() and self.check_timeline():
                cluster_timeline = self.cluster.timeline
                my_timeline = self.state_handler.replica_cached_timeline(cluster_timeline)
                if my_timeline < cluster_timeline:
                    logger.info('My timeline %s is behind last known cluster timeline %s', my_timeline, cluster_timeline)
                    return False
    
            # Prepare list of nodes to run check against
            members = [m for m in members if m.name != self.state_handler.name and not m.nofailover and m.api_url]
    
            if members:
                for st in self.fetch_nodes_statuses(members):
                    if st.failover_limitation() is None:
                        if not st.in_recovery:
                            logger.warning('Master (%s) is still alive', st.member.name)
                            return False
                        if my_wal_position < st.wal_position:
                            logger.info('Wal position of %s is ahead of my wal position', st.member.name)
                            # In synchronous mode the former leader might be still accessible and even be ahead of us.
                            # We should not disqualify himself from the leader race in such a situation.
                            if not self.is_synchronous_mode() or st.member.name != self.cluster.sync.leader:
                                return False
                            logger.info('Ignoring the former leader being ahead of us')
            return True
    
    

    当前节点为健康节点,因当前集群没有主节点,需要执行leader锁抢占。如果当前节点抢占leader锁失败,将作为从节点加入到集群中。
    当前节点为异常节点,则会一直等待PostgreSQL节点正常后,参与集群的选举行为。
    健康的集群流程

    def process_healthy_cluster(self):
      if self.has_lock():
          if self.is_paused() and not self.state_handler.is_leader():
              if self.cluster.failover and self.cluster.failover.candidate == self.state_handler.name:
                  return 'waiting to become master after promote...'
    
              if not self.is_standby_cluster():
                  self._delete_leader()
                  return 'removed leader lock because postgres is not running as master'
    
          if self.update_lock(True):
              msg = self.process_manual_failover_from_leader()
              if msg is not None:
                  return msg
    
              # check if the node is ready to be used by pg_rewind
              self._rewind.ensure_checkpoint_after_promote(self.wakeup)
    
              if self.is_standby_cluster():
                  # in case of standby cluster we don't really need to
                  # enforce anything, since the leader is not a master.
                  # So just remind the role.
                  msg = 'no action. I am ({0}), the standby leader with the lock'.format(self.state_handler.name) \
                        if self.state_handler.role == 'standby_leader' else \
                        'promoted self to a standby leader because i had the session lock'
                  return self.enforce_follow_remote_master(msg)
              else:
                  return self.enforce_master_role(
                      'no action. I am ({0}), the leader with the lock'.format(self.state_handler.name),
                      'promoted self to leader because I had the session lock'
                  )
          else:
              # Either there is no connection to DCS or someone else acquired the lock
              logger.error('failed to update leader lock')
              if self.state_handler.is_leader():
                  if self.is_paused():
                      return 'continue to run as master after failing to update leader lock in DCS'
                  self.demote('immediate-nolock')
                  return 'demoted self because failed to update leader lock in DCS'
              else:
                  return 'not promoting because failed to update leader lock in DCS'
      else:
          logger.debug('does not have lock')
      lock_owner = self.cluster.leader and self.cluster.leader.name
      if self.is_standby_cluster():
          return self.follow('cannot be a real primary in a standby cluster',
                             'no action. I am ({0}), a secondary, and following a standby leader ({1})'.format(
                                  self.state_handler.name, lock_owner), refresh=False)
      return self.follow('demoting self because I do not have the lock and I was a leader',
                         'no action. I am ({0}), a secondary, and following a leader ({1})'.format(
                              self.state_handler.name, lock_owner), refresh=False)
    
    

    健康的集群流程,是指当前的集群存在leader节点,对该流程的处理,主要有2个方向:
    1.检测当前节点为主节点,进行更新leader锁操作,保持主节点心跳,避免从节点竞争锁,如果更新锁失败,将立即释放锁,让其他从节点抢占;
    2.检测当前节点非主节点,作为从节点加入集群。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月8日
  • 已采纳回答 10月31日
  • 创建了问题 10月30日