北方的_南先生 2023-05-04 11:32 采纳率: 0%
浏览 25
已结题

利用Redis-Replicator实现Redis-Cluster集群同步

1.利用Redis-Replicator实现Redis-Cluster集群同步,源端备端都是三主三从,分布在三台CentOS7.6,4核8G的linux服务器上。在单机同步成功的基础上,开了三个线程,分别对源备的三个master进行点对点同步,结果源端1000w数据,备端只同步了300w不到,但是每个线程启动后立刻sleep 30s左右,1000w数据一个不少

public class MigrationExample {

public static void main(String[] args) throws IOException, URISyntaxException {
    sync("redis://127.0.0.1:6379", "redis://127.0.0.1:6380");
}

/*
 * Precondition:
 * 1. Make sure the two redis version is same.
 * 2. Make sure the single key-value is not very big.(highly recommend less then 1 MB)
 *
 * We running following steps to sync two redis.
 * 1. Get rdb stream from source redis.
 * 2. Convert source rdb stream to redis dump format.
 * 3. Use Jedis RESTORE command to restore that dump format to target redis.
 * 4. Get aof stream from source redis and sync to target redis.
 */
public static void sync(String sourceUri, String targetUri) throws IOException, URISyntaxException {
    RedisURI suri = new RedisURI(sourceUri);
    RedisURI turi = new RedisURI(targetUri);
    final ExampleClient target = new ExampleClient(turi.getHost(), turi.getPort());
    Configuration tconfig = Configuration.valueOf(turi);
    if (tconfig.getAuthPassword() != null) {
        Object auth = target.send(AUTH, tconfig.getAuthPassword().getBytes());
        System.out.println("AUTH:" + auth);
    }
    final AtomicInteger dbnum = new AtomicInteger(-1);
    Replicator r = dress(new RedisReplicator(suri));

    r.addEventListener(new EventListener() {
        @Override
        public void onEvent(Replicator replicator, Event event) {
            if (event instanceof DumpKeyValuePair) {
                DumpKeyValuePair dkv = (DumpKeyValuePair) event;
                // Step1: select db
                DB db = dkv.getDb();
                int index;
                if (db != null && (index = (int) db.getDbNumber()) != dbnum.get()) {
                    target.send(SELECT, toByteArray(index));
                    dbnum.set(index);
                    System.out.println("SELECT:" + index);
                }

                // Step2: restore dump data
                if (dkv.getExpiredMs() == null) {
                    Object r = target.restore(dkv.getKey(), 0L, dkv.getValue(), true);
                    System.out.println(r);
                } else {
                    long ms = dkv.getExpiredMs() - System.currentTimeMillis();
                    if (ms <= 0) return;
                    Object r = target.restore(dkv.getKey(), ms, dkv.getValue(), true);
                    System.out.println(r);
                }
            }

            if (event instanceof DefaultCommand) {
                // Step3: sync aof command
                DefaultCommand dc = (DefaultCommand) event;
                Object r = target.send(dc.getCommand(), dc.getArgs());
                System.out.println(r);
            }
        }
    });

    r.addCloseListener(new CloseListener() {
        @Override
        public void handle(Replicator replicator) {
            target.close();
        }
    });
    r.open();
}


 public static class ExampleClient implements Closeable {

    private Jedis jedis;

    public ExampleClient(final String host, final int port) {
        DefaultJedisClientConfig.Builder config = DefaultJedisClientConfig.builder();
        config.timeoutMillis(10000);
        this.jedis = new Jedis(new HostAndPort(host, port), config.build());
    }

    public Object send(Protocol.Command cmd, final byte[]... args) {
        Object r = jedis.sendCommand(cmd, args);
        if (r instanceof byte[]) {
            return Strings.toString(r);
        } else {
            return r;
        }
    }

    public Object send(final byte[] cmd, final byte[]... args) {
        return send(Protocol.Command.valueOf(Strings.toString(cmd).toUpperCase()), args);
    }

    public Object restore(byte[] key, long expired, byte[] dumped, boolean replace) {
        if (!replace) {
            return send(RESTORE, key, toByteArray(expired), dumped);
        } else {
            return send(RESTORE, key, toByteArray(expired), dumped, "REPLACE".getBytes());
        }
    }

    @Override
    public void close() {
        if (jedis != null) {
            jedis.close();
        }
    }
}

这段代码实现了单机源备同步,现在把它单独写在一个Thread类里的run方法中,有三台源备开三个线程同时同步,CPU百分之两百多,备端同步数据不全。想定位到哪里资源耗尽了导致程序异常或者让这三个线程能顺序执行,完成任务
  • 写回答

1条回答 默认 最新

  • Zyb0627 2023-05-04 21:22
    关注

    引用chatGPT作答,从程序的CPU利用率超过100%的现象以及备端同步数据不全的情况,可以初步推断是多线程并发导致的资源竞争问题。具体的解决方案如下:

    添加日志打印信息:在代码中添加日志打印信息,可以帮助我们了解到程序的运行情况,包括每个线程的执行情况,以及出现异常时的堆栈信息等。

    添加锁机制:通过添加锁机制,可以避免多个线程同时访问同一个资源的问题。可以使用Java中的synchronized关键字或者Lock接口实现锁机制。

    调整线程数:在多线程并发时,线程数的多少也会对程序的执行效率产生影响。可以适当调整线程数,使其在一定范围内,达到最优的性能。

    调整Redis配置:在Redis中,可以通过修改配置文件的方式,调整内存使用、网络连接、并发数等参数,来优化Redis的性能。可以根据具体情况,适当调整Redis的配置。

    优化代码:在程序编写时,需要注意代码的质量和效率。可以通过合理的算法设计、优化IO操作等方式,提高程序的执行效率,减少资源占用。同时,需要及时处理异常,避免程序出现崩溃等情况。

    评论

报告相同问题?

问题事件

  • 系统已结题 5月12日
  • 创建了问题 5月4日

悬赏问题

  • ¥15 openFOAM DPMFoam
  • ¥15 将查询到的值,赋值到table指定行中
  • ¥50 docker容器内部启动shell脚本多命令
  • ¥15 请问python的selenium怎么设置referer
  • ¥15 请教下, VS QT 环境下, QTOPCUA 的源文件报错,这种情况咋查呢 ?
  • ¥20 UNITY webgl关于文档的上传和下载问题
  • ¥15 安霸cv22 + rtl8211f 千兆,udp传输丢包
  • ¥15 关于区块链和边缘环境搭建的相关问题
  • ¥15 windows远程桌面断卡重连软件卡顿问题
  • ¥30 Unity 实现扫描效果