请对Patroni高可用源码进行分析?
主要包括Patroni高可用启动流程及步骤
1条回答 默认 最新
关注
流程说明:
●加载集群信息,通过DCS支持的API接口,获取集群信息,主要内容如下:
○config:记录pg集群ID以及配置信息(包括pg参数信息、一些超时时间配置等),用于集群校验、节点重建等;
○leader:记录主节点选举时间、心跳时间、选举周期、最新的lsn等,用于主节点完成竞争后的信息记录;
○sync: 记录主节点和同步节点信息,由主节点记录,用于主从切换、故障转移的同步节点校验;
○failover: 记录最后一次故障转移的时间。
●集群状态检测,主要检测集群配置信息的内容校验,当前集群的整体状态及节点状态,判断通过什么方式来启动PostgreSQL;
●启动PostgreSQL,用于初始化PostgreSQL目录,根据集群信息设置相应的PostgreSQL配置信息,并启动;
●生成PostgreSQL集群,指将完成启动的PostgreSQL节点,通过设置主从角色,关联不同角色的PostgreSQL节点,最终生成完整的集群。
加载集群信息
加载集群信息,是高可用流程启动的第一步,也是生成PostgreSQL集群的最关键信息。# 第一步,记载集群信息 ...... try: self.load_cluster_from_dcs() self.state_handler.reset_cluster_info_state(self.cluster, self.patroni.nofailover) except Exception: self.state_handler.reset_cluster_info_state(None, self.patroni.nofailover) raise ...... # 通过DCS接口加载集群信息 def load_cluster_from_dcs(self): cluster = self.dcs.get_cluster() # We want to keep the state of cluster when it was healthy if not cluster.is_unlocked() or not self.old_cluster: self.old_cluster = cluster self.cluster = cluster if not self.has_lock(False): self.set_is_leader(False) self._leader_timeline = None if cluster.is_unlocked() else cluster.leader.timeline # 集群接口 def get_cluster(self, force=False): if force: self._bypass_caches() try: cluster = self._load_cluster() except Exception: self.reset_cluster() raise self._last_seen = int(time.time()) with self._cluster_thread_lock: self._cluster = cluster self._cluster_valid_till = time.time() + self.ttl return cluster @abc.abstractmethod def _load_cluster(self): """Internally this method should build `Cluster` object which represents current state and topology of the cluster in DCS. this method supposed to be called only by `get_cluster` method. raise `~DCSError` in case of communication or other problems with DCS. If the current node was running as a master and exception raised, instance would be demoted."""
以Kubernetes作为DCS为例
def _load_cluster(self): stop_time = time.time() + self._retry.deadline self._api.refresh_api_servers_cache() try: with self._condition: self._wait_caches(stop_time) members = [self.member(pod) for pod in self._pods.copy().values()] nodes = self._kinds.copy() config = nodes.get(self.config_path) metadata = config and config.metadata annotations = metadata and metadata.annotations or {} # get initialize flag initialize = annotations.get(self._INITIALIZE) # get global dynamic configuration config = ClusterConfig.from_node(metadata and metadata.resource_version, annotations.get(self._CONFIG) or '{}', metadata.resource_version if self._CONFIG in annotations else 0) # get timeline history history = TimelineHistory.from_node(metadata and metadata.resource_version, annotations.get(self._HISTORY) or '[]') leader = nodes.get(self.leader_path) metadata = leader and leader.metadata self._leader_resource_version = metadata.resource_version if metadata else None annotations = metadata and metadata.annotations or {} # get last known leader lsn last_lsn = annotations.get(self._OPTIME) try: last_lsn = 0 if last_lsn is None else int(last_lsn) except Exception: last_lsn = 0 # get permanent slots state (confirmed_flush_lsn) slots = annotations.get('slots') try: slots = slots and json.loads(slots) except Exception: slots = None # get leader leader_record = {n: annotations.get(n) for n in (self._LEADER, 'acquireTime', 'ttl', 'renewTime', 'transitions') if n in annotations} if (leader_record or self._leader_observed_record) and leader_record != self._leader_observed_record: self._leader_observed_record = leader_record self._leader_observed_time = time.time() leader = leader_record.get(self._LEADER) try: ttl = int(leader_record.get('ttl')) or self._ttl except (TypeError, ValueError): ttl = self._ttl if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time(): leader = None if metadata: member = Member(-1, leader, None, {}) member = ([m for m in members if m.name == leader] or [member])[0] leader = Leader(metadata.resource_version, None, member) # failover key failover = nodes.get(self.failover_path) metadata = failover and failover.metadata failover = Failover.from_node(metadata and metadata.resource_version, metadata and (metadata.annotations or {}).copy()) # get synchronization state sync = nodes.get(self.sync_path) metadata = sync and sync.metadata sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations) return Cluster(initialize, config, leader, last_lsn, members, failover, sync, history, slots) except Exception: logger.exception('get_cluster') raise KubernetesError('Kubernetes API is not responding properly')
集群状态检测
if self.is_paused(): self.watchdog.disable() self._was_paused = True else: if self._was_paused: self.state_handler.schedule_sanity_checks_after_pause() self._was_paused = False if not self.cluster.has_member(self.state_handler.name): self.touch_member() # cluster has leader key but not initialize key if not (self.cluster.is_unlocked() or self.sysid_valid(self.cluster.initialize)) and self.has_lock(): self.dcs.initialize(create_new=(self.cluster.initialize is None), sysid=self.state_handler.sysid) if not (self.cluster.is_unlocked() or self.cluster.config and self.cluster.config.data) and self.has_lock(): self.dcs.set_config_value(json.dumps(self.patroni.config.dynamic_configuration, separators=(',', ':'))) self.cluster = self.dcs.get_cluster() if self._async_executor.busy: return self.handle_long_action_in_progress() msg = self.handle_starting_instance() if msg is not None: return msg # we've got here, so any async action has finished. if self.state_handler.bootstrapping: return self.post_bootstrap() if self.recovering: self.recovering = False if not self._rewind.is_needed: # Check if we tried to recover from postgres crash and failed msg = self.post_recover() if msg is not None: return msg # Reset some states after postgres successfully started up self._crash_recovery_executed = False if self._rewind.executed and not self._rewind.failed: self._rewind.reset_state() # The Raft cluster without a quorum takes a bit of time to stabilize. # Therefore we want to postpone the leader race if we just started up. if self.cluster.is_unlocked() and self.dcs.__class__.__name__ == 'Raft': return 'started as a secondary'
检测集群是否暂停
集群暂停,是指集群中的PostgreSQL节点不由Patroni管理,当集群异常时,不再出发故障转移等措施。
集群暂停一般由用户主动出发,可以用在单个PostgreSQL节点的维护上,触发方式:root@pg142-1013-postgresql-0:/home/postgres# patronictl list + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-------------------------+----------------+--------------+---------+----+-----------+ | pg142-1013-postgresql-0 | 10.244.117.143 | Leader | running | 3 | | | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running | 3 | 0 | +-------------------------+----------------+--------------+---------+----+-----------+ root@pg142-1013-postgresql-0:/home/postgres# patronictl pause Success: cluster management is paused root@pg142-1013-postgresql-0:/home/postgres# patronictl list + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-------------------------+----------------+--------------+---------+----+-----------+ | pg142-1013-postgresql-0 | 10.244.117.143 | Leader | running | 3 | | | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running | 3 | 0 | +-------------------------+----------------+--------------+---------+----+-----------+ Maintenance mode: on
上述,即表示当前集群已停止。此时,PostgreSQL进程仍然存活,如果故障,将需要用户自行启动。
集群暂停恢复方式:root@pg142-1013-postgresql-0:/home/postgres# patronictl list + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-------------------------+----------------+--------------+---------+----+-----------+ | pg142-1013-postgresql-0 | 10.244.117.143 | Leader | running | 3 | | | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running | 3 | 0 | +-------------------------+----------------+--------------+---------+----+-----------+ Maintenance mode: on root@pg142-1013-postgresql-0:/home/postgres# patronictl resume Success: cluster management is resumed root@pg142-1013-postgresql-0:/home/postgres# patronictl list + Cluster: pg142-1013-postgresql (7289263672843878470) ---+---------+----+-----------+ | Member | Host | Role | State | TL | Lag in MB | +-------------------------+----------------+--------------+---------+----+-----------+ | pg142-1013-postgresql-0 | 10.244.117.143 | Leader | running | 3 | | | pg142-1013-postgresql-1 | 10.244.165.220 | Sync Standby | running | 3 | 0 | +-------------------------+----------------+--------------+---------+----+-----------+
通过命令,即可恢复集群。
在恢复集群后,需要对集群中PostgreSQL节点进行处理:
1.重新配置PostgreSQL的参数;
2.根据xxx-sync中最后一次记录的主、同步节点名称信息,在主节点上设置同步复制槽信息;
3.检测恢复后的PostgreSQL节点的是否变更,与最后一次xxx-config中的值,是否一致,否则将无法恢复集群。本回答被题主选为最佳回答 , 对您是否有帮助呢?解决 无用评论 打赏 举报
悬赏问题
- ¥15 35114 SVAC视频验签的问题
- ¥15 impedancepy
- ¥15 在虚拟机环境下完成以下,要求截图!
- ¥15 求往届大挑得奖作品(ppt…)
- ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
- ¥50 浦育平台scratch图形化编程
- ¥20 求这个的原理图 只要原理图
- ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
- ¥20 微信的店铺小程序如何修改背景图
- ¥15 UE5.1局部变量对蓝图不可见