需求:
命令队列(存放n个命令,ip 端口 命令ID等信息),通过多线程scoket(线程数是20个).从命令队列里面取出 命令,解析执行。
连接的服务器,连接数是有限制的最大2个连接。所以scoket 必须维持长连接
最多可以有20个线程 连接20个不同IP的机器。但机器是N个。
问题1 怎么在多线程中维持长连接发送 接收数据,相同IP的命令发送接收完后需要断开连接。
需求:
命令队列(存放n个命令,ip 端口 命令ID等信息),通过多线程scoket(线程数是20个).从命令队列里面取出 命令,解析执行。
连接的服务器,连接数是有限制的最大2个连接。所以scoket 必须维持长连接
最多可以有20个线程 连接20个不同IP的机器。但机器是N个。
问题1 怎么在多线程中维持长连接发送 接收数据,相同IP的命令发送接收完后需要断开连接。
对我的思路作下修改应该能满足你的需求
[code="java"]
public class Demo {
public static void main(String[] args) throws UnknownHostException, IOException {
//我这里演示4个不同的IP的多任务任务队列
//已知条件提到20个线程不断开的连接20个IP的机器,那就只能对于某个IP只能连接它的一个端口
//只是演示一下,当做伪代码看
Queue<String[]> taskQueue = new LinkedList<String[]>();
List<String[]> list = Arrays.asList(
new String[]{"192.168.0.2", "2", "laugh"},
new String[]{"192.168.0.2", "2", "run"},
new String[]{"192.168.0.2", "2", "sing"},
new String[]{"192.168.0.3", "3", "run"},
new String[]{"192.168.0.3", "3", "sing"},
new String[]{"192.168.0.4", "4", "laugh"},
new String[]{"192.168.0.4", "4", "sing"},
new String[]{"192.168.0.5", "5", "sing"}
);
// < "ip:port", Client连接>
Map<String, Client> map = new HashMap<String, Client>();
for (String[] strings : list) {
String ip = strings[0];
int port = Integer.parseInt(strings[1]);
String comman = strings[2];
//标识连接某个IP的socket
String socketKey = ip + ":" + port;
Client client = map.get(socketKey);
//如果对于这个IP还没发起连接
if(client == null){
client = new Client(ip, port);
map.put(socketKey, client);
}else{
client.addTask(comman);
}
}
}
}
class Client extends Thread{
private int port;
private String host;
private Socket socket;
private PrintWriter writer;
private BufferedReader reader;
private boolean shutdown = false;
//每个发起的连接都有自己的任务队列
private Queue<String> taskQueue = new LinkedList<String>();
//一个客户端对应一个socket连接
public Client(String host, int port) {
//TODO validate
this.host = host;
this.port = port;
}
@Override
//这里只用一个线程处理任务,因为对一个IP只能建立一个连接
public void run() {
this.socket = new Socket();
try{
socket.connect(InetSocketAddress.createUnresolved(host, port));
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
String task;
//当还没通知关闭时,线程一直运行(同时连接也保持)
while(!shutdown){
//这里要取出队列中的任务,可能取任务的同时
//外面也传进任务,所以要加上同步块
synchronized (taskQueue) {
//当任务队列一直为空的时候一直循环,知道有任务或者通知关闭
while(taskQueue.isEmpty()){
if(shutdown){//如果需要关闭客户端,则返回
return;
}
//如果不需要关闭,则等待资源
//当队列中增加任务会被唤醒
try {
taskQueue.wait();
}catch (InterruptedException e) {}
}
//此时任务队列有任务进来
task = taskQueue.poll();
}
//执行任务不需要同步块保护
execTask(task);
}
}catch (Exception e) {
e.printStackTrace();
}finally{
closeAll();
}
}
public void addTask(String command) {
synchronized (taskQueue) {
taskQueue.add(command);
//唤醒等待任务队列资源的当前线程
taskQueue.notifyAll();
}
}
public void execTask (String command) {
writer.println(command);//当然PrintWriter.println用在网络上有点问题,先不管
}
class ReadServerInfoThread extends Thread{
@Override
public void run() {
try{
String line = null;
while( (line = reader.readLine()) != null ){
//打印服务器返回的消息
System.out.println(line);
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
public void closeAll() {
if(writer != null){
try {
writer.close();
} catch (Exception e2) {
}
}
if(reader != null){
try {
reader.close();
} catch (Exception e2) {
}
}
if(socket != null && !socket.isClosed()){
try {
socket.close();
} catch (Exception e2) {
}
}
}
}
[/code]