请教多线程JAVA问题

需求:

命令队列(存放n个命令,ip 端口 命令ID等信息),通过多线程scoket(线程数是20个).从命令队列里面取出 命令,解析执行。

连接的服务器,连接数是有限制的最大2个连接。所以scoket 必须维持长连接

最多可以有20个线程 连接20个不同IP的机器。但机器是N个。

问题1 怎么在多线程中维持长连接发送 接收数据,相同IP的命令发送接收完后需要断开连接。

3个回答

对我的思路作下修改应该能满足你的需求

[code="java"]
public class Demo {
public static void main(String[] args) throws UnknownHostException, IOException {

    //我这里演示4个不同的IP的多任务任务队列
    //已知条件提到20个线程不断开的连接20个IP的机器,那就只能对于某个IP只能连接它的一个端口
    //只是演示一下,当做伪代码看
    Queue<String[]> taskQueue = new LinkedList<String[]>();

    List<String[]> list = Arrays.asList(
            new String[]{"192.168.0.2", "2", "laugh"},  
            new String[]{"192.168.0.2", "2", "run"},  
            new String[]{"192.168.0.2", "2", "sing"},  
            new String[]{"192.168.0.3", "3", "run"},  
            new String[]{"192.168.0.3", "3", "sing"},
            new String[]{"192.168.0.4", "4", "laugh"},  
            new String[]{"192.168.0.4", "4", "sing"},
            new String[]{"192.168.0.5", "5", "sing"}
        );

    //  < "ip:port", Client连接>
    Map<String, Client> map = new HashMap<String, Client>();

    for (String[] strings : list) {
        String ip = strings[0];
        int port = Integer.parseInt(strings[1]);
        String comman = strings[2];

        //标识连接某个IP的socket
        String socketKey = ip + ":" + port;
        Client client = map.get(socketKey);

        //如果对于这个IP还没发起连接
        if(client == null){
            client = new Client(ip, port);
            map.put(socketKey, client);
        }else{
            client.addTask(comman);
        }
    }
}

}

class Client extends Thread{

private int port;
private String host;
private Socket socket;
private PrintWriter writer;
private BufferedReader reader;
private boolean shutdown = false;
//每个发起的连接都有自己的任务队列
private Queue<String> taskQueue = new LinkedList<String>();

//一个客户端对应一个socket连接
public Client(String host, int port) {
    //TODO validate
    this.host = host;
    this.port = port;
}

@Override
//这里只用一个线程处理任务,因为对一个IP只能建立一个连接
public void run() {
    this.socket = new Socket();
    try{
        socket.connect(InetSocketAddress.createUnresolved(host, port));
        reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);

        String task;
        //当还没通知关闭时,线程一直运行(同时连接也保持)
        while(!shutdown){
            //这里要取出队列中的任务,可能取任务的同时
            //外面也传进任务,所以要加上同步块
            synchronized (taskQueue) {
                //当任务队列一直为空的时候一直循环,知道有任务或者通知关闭
                while(taskQueue.isEmpty()){
                    if(shutdown){//如果需要关闭客户端,则返回
                        return;
                    }
                    //如果不需要关闭,则等待资源
                    //当队列中增加任务会被唤醒
                    try {
                        taskQueue.wait();
                    }catch (InterruptedException  e) {}
                }

                //此时任务队列有任务进来
                task = taskQueue.poll();
            }

            //执行任务不需要同步块保护
            execTask(task);

        }
    }catch (Exception e) {
        e.printStackTrace();
    }finally{
        closeAll();
    }
}

public void addTask(String command) {
    synchronized (taskQueue) {
        taskQueue.add(command);
        //唤醒等待任务队列资源的当前线程
        taskQueue.notifyAll();
    }
}

public void execTask (String command) {
    writer.println(command);//当然PrintWriter.println用在网络上有点问题,先不管
}

class ReadServerInfoThread extends Thread{
    @Override
    public void run() {
        try{
            String line = null;
            while( (line = reader.readLine()) != null ){
                //打印服务器返回的消息
                System.out.println(line);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public void closeAll() {
    if(writer != null){
        try {
            writer.close();
        } catch (Exception e2) {
        }
    }
    if(reader != null){
        try {
            reader.close();
        } catch (Exception e2) {
        }
    }
    if(socket != null && !socket.isClosed()){
        try {
            socket.close();
        } catch (Exception e2) {
        }
    }
}

}

[/code]

feizhuzi
feizhuzi 抱歉,我也没有启动client线程,client.start()
8 年多之前 回复
feizhuzi
feizhuzi 抱歉,我忘了关闭连接, 增加一个List<Client>, 当没有任务后遍历list,通知client可以准备关闭, client收到通知,在等待资源前检查是否没有任务且收到关闭通知。。。。。。
8 年多之前 回复

最大两个连接跟20个不同ip的机器还必须长连接貌似有点冲突,问题有点模糊、歧义,建议修改一下。

不关闭断开的socket就是长连接呀,你循环发送数据不久可以一直维持嘛,你的问题很诡异,整理下你的问题吧! :D

feizhuzi
feizhuzi 20个线程连接20个不同IP的机器,是否意味着命令队列中对于同一个IP不可能有两个不同的端口??
8 年多之前 回复
feizhuzi
feizhuzi 我明白了,晚上回你
8 年多之前 回复
doukk
doukk 就20个线程 不可能一直维持长连接等待相同IP的命令。有N台机器N条命令 接收上来的命令是一条一条的存放在队列里。 如:192.168.1.1 6 192.168.1.1 7 192.168.2.2 6 192.168.2.2 8 处理一条命令需要10分钟,取一个月的数据。 需求是想在建立一个连接(192.168.1.1)同时处理掉 相同IP的命令(如 6 7 ...),队列一直是接收状态
8 年多之前 回复
立即提问
相关内容推荐