青鱼292 2023-10-30 16:49 采纳率: 89.3%
浏览 0
已结题

请对Patroni高可用进行分析?

请对Patroni高可用源码进行分析?
主要包括Patroni高可用启动流程及步骤

  • 写回答

1条回答 默认 最新

  • 谐云 谐云官方账号 2023-10-30 17:17
    关注

    img


    流程说明:
    ●加载集群信息,通过DCS支持的API接口,获取集群信息,主要内容如下:
    ○config:记录pg集群ID以及配置信息(包括pg参数信息、一些超时时间配置等),用于集群校验、节点重建等;
    ○leader:记录主节点选举时间、心跳时间、选举周期、最新的lsn等,用于主节点完成竞争后的信息记录;
    ○sync: 记录主节点和同步节点信息,由主节点记录,用于主从切换、故障转移的同步节点校验;
    ○failover: 记录最后一次故障转移的时间。
    ●集群状态检测,主要检测集群配置信息的内容校验,当前集群的整体状态及节点状态,判断通过什么方式来启动PostgreSQL;
    ●启动PostgreSQL,用于初始化PostgreSQL目录,根据集群信息设置相应的PostgreSQL配置信息,并启动;
    ●生成PostgreSQL集群,指将完成启动的PostgreSQL节点,通过设置主从角色,关联不同角色的PostgreSQL节点,最终生成完整的集群。
    加载集群信息
    加载集群信息,是高可用流程启动的第一步,也是生成PostgreSQL集群的最关键信息。

    # 第一步,记载集群信息
    ......
    try:
        self.load_cluster_from_dcs()
        self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover)
    except Exception:
        self.state_handler.reset_cluster_info_state(None, self.patroni.nofailover)
        raise
    ......
    # 通过DCS接口加载集群信息
    def load_cluster_from_dcs(self):
        cluster = self.dcs.get_cluster()
    
        # We want to keep the state of cluster when it was healthy
        if not cluster.is_unlocked() or not self.old_cluster:
            self.old_cluster = cluster
        self.cluster = cluster
    
        if not self.has_lock(False):
            self.set_is_leader(False)
    
        self._leader_timeline = None if cluster.is_unlocked() else cluster.leader.timeline
    
    # 集群接口
    def get_cluster(self, force=False):
        if force:
            self._bypass_caches()
        try:
            cluster = self._load_cluster()
        except Exception:
            self.reset_cluster()
            raise
    
        self._last_seen = int(time.time())
    
        with self._cluster_thread_lock:
            self._cluster = cluster
            self._cluster_valid_till = time.time() + self.ttl
            return cluster
    
    @abc.abstractmethod
    def _load_cluster(self):
        """Internally this method should build  `Cluster` object which
           represents current state and topology of the cluster in DCS.
           this method supposed to be called only by `get_cluster` method.
    
           raise `~DCSError` in case of communication or other problems with DCS.
           If the current node was running as a master and exception raised,
           instance would be demoted."""
    

    以Kubernetes作为DCS为例

    def _load_cluster(self):
        stop_time = time.time() + self._retry.deadline
        self._api.refresh_api_servers_cache()
        try:
            with self._condition:
                self._wait_caches(stop_time)
    
                members = [self.member(pod) for pod in self._pods.copy().values()]
                nodes = self._kinds.copy()
    
            config = nodes.get(self.config_path)
            metadata = config and config.metadata
            annotations = metadata and metadata.annotations or {}
    
            # get initialize flag
            initialize = annotations.get(self._INITIALIZE)
    
            # get global dynamic configuration
            config = ClusterConfig.from_node(metadata and metadata.resource_version,
                                             annotations.get(self._CONFIG) or '{}',
                                             metadata.resource_version if self._CONFIG in annotations else 0)
    
            # get timeline history
            history = TimelineHistory.from_node(metadata and metadata.resource_version,
                                                annotations.get(self._HISTORY) or '[]')
    
            leader = nodes.get(self.leader_path)
            metadata = leader and leader.metadata
            self._leader_resource_version = metadata.resource_version if metadata else None
            annotations = metadata and metadata.annotations or {}
    
            # get last known leader lsn
            last_lsn = annotations.get(self._OPTIME)
            try:
                last_lsn = 0 if last_lsn is None else int(last_lsn)
            except Exception:
                last_lsn = 0
    
            # get permanent slots state (confirmed_flush_lsn)
            slots = annotations.get('slots')
            try:
                slots = slots and json.loads(slots)
            except Exception:
                slots = None
    
            # get leader
            leader_record = {n: annotations.get(n) for n in (self._LEADER, 'acquireTime',
                             'ttl', 'renewTime', 'transitions') if n in annotations}
            if (leader_record or self._leader_observed_record) and leader_record != self._leader_observed_record:
                self._leader_observed_record = leader_record
                self._leader_observed_time = time.time()
    
            leader = leader_record.get(self._LEADER)
            try:
                ttl = int(leader_record.get('ttl')) or self._ttl
            except (TypeError, ValueError):
                ttl = self._ttl
    
            if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time():
                leader = None
    
            if metadata:
                member = Member(-1, leader, None, {})
                member = ([m for m in members if m.name == leader] or [member])[0]
                leader = Leader(metadata.resource_version, None, member)
    
            # failover key
            failover = nodes.get(self.failover_path)
            metadata = failover and failover.metadata
            failover = Failover.from_node(metadata and metadata.resource_version,
                                          metadata and (metadata.annotations or {}).copy())
    
            # get synchronization state
            sync = nodes.get(self.sync_path)
            metadata = sync and sync.metadata
            sync = SyncState.from_node(metadata and metadata.resource_version,  metadata and metadata.annotations)
    
            return Cluster(initialize, config, leader, last_lsn, members, failover, sync, history, slots)
        except Exception:
            logger.exception('get_cluster')
            raise KubernetesError('Kubernetes API is not responding properly')
    
    

    集群状态检测

     if self.is_paused():
          self.watchdog.disable()
          self._was_paused = True
      else:
          if self._was_paused:
              self.state_handler.schedule_sanity_checks_after_pause()
          self._was_paused = False
      
      if not self.cluster.has_member(self.state_handler.name):
          self.touch_member()
      
      # cluster has leader key but not initialize key
      if not (self.cluster.is_unlocked() or self.sysid_valid(self.cluster.initialize)) and self.has_lock():
          self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid)
      
      if not (self.cluster.is_unlocked() or self.cluster.config and self.cluster.config.data) and self.has_lock():
          self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':')))
          self.cluster = self.dcs.get_cluster()
      
      if self._async_executor.busy:
          return self.handle_long_action_in_progress()
      
      msg = self.handle_starting_instance()
      if msg is not None:
          return msg
      
      # we've got here, so any async action has finished.
      if self.state_handler.bootstrapping:
          return self.post_bootstrap()
      
      if self.recovering:
          self.recovering = False
      
          if not self._rewind.is_needed:
              # Check if we tried to recover from postgres crash and failed
              msg = self.post_recover()
              if msg is not None:
                  return msg
      
          # Reset some states after postgres successfully started up
          self._crash_recovery_executed = False
          if self._rewind.executed and not self._rewind.failed:
              self._rewind.reset_state()
      
          # The Raft cluster without a quorum takes a bit of time to stabilize.
          # Therefore we want to postpone the leader race if we just started up.
          if self.cluster.is_unlocked() and self.dcs.__class__.__name__ == 'Raft':
              return 'started as a secondary'
    
    

    检测集群是否暂停

    集群暂停,是指集群中的PostgreSQL节点不由Patroni管理,当集群异常时,不再出发故障转移等措施。
    集群暂停一般由用户主动出发,可以用在单个PostgreSQL节点的维护上,触发方式:

    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
    root@pg142-1013-postgresql-0:/home/postgres# patronictl pause
    Success: cluster management is paused
    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
     Maintenance mode: on
    

    上述,即表示当前集群已停止。此时,PostgreSQL进程仍然存活,如果故障,将需要用户自行启动。
    集群暂停恢复方式:

    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
     Maintenance mode: on
    root@pg142-1013-postgresql-0:/home/postgres# patronictl resume
    Success: cluster management is resumed
    root@pg142-1013-postgresql-0:/home/postgres# patronictl list
    + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+
    | Member                  | Host           | Role         | State   | TL | Lag in MB |
    +-------------------------+----------------+--------------+---------+----+-----------+
    | pg142-1013-postgresql-0 | 10.244.117.143 | Leader       | running |  3 |           |
    | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running |  3 |         0 |
    +-------------------------+----------------+--------------+---------+----+-----------+
    
    

    通过命令,即可恢复集群。
    在恢复集群后,需要对集群中PostgreSQL节点进行处理:
    1.重新配置PostgreSQL的参数;
    2.根据xxx-sync中最后一次记录的主、同步节点名称信息,在主节点上设置同步复制槽信息;
    3.检测恢复后的PostgreSQL节点的是否变更,与最后一次xxx-config中的值,是否一致,否则将无法恢复集群。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

问题事件

  • 系统已结题 11月8日
  • 已采纳回答 10月31日
  • 创建了问题 10月30日

悬赏问题

  • ¥15 35114 SVAC视频验签的问题
  • ¥15 impedancepy
  • ¥15 在虚拟机环境下完成以下,要求截图!
  • ¥15 求往届大挑得奖作品(ppt…)
  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见