线程池实现socket 通信问题,不能长时间运行。

1.创建一个监听

[code="java"]import java.io.IOException;
import java.net.*;
import java.util.concurrent.Future;

public class Listener extends ServerSocket implements Runnable {

public Listener() throws IOException {
    super(Server.AppConfig.getLocalhost().getListenport());
}

@Override
public void run() {
    while (true) {
        try {
            Socket socket = accept();
            CreateServer server = new CreateServer(socket, Server.pool);
            Future<Integer> result = Server.pool.submit(server);                
            Server.Results.add(result);
        } catch (Exception e) {
            Server.createMessage("Listener:"+e.getMessage());
        } finally {
        }
    }

}

}[/code]

  1. 创建一个解析socket的 服务

[code="java"]import java.net.Socket;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.io.*;

import com.szcust.einfo.receiveEntity.serverEntity.DataText;

public class CreateServer implements Callable {
private Socket client;
private Scanner in;
private PrintWriter out;
private Resolve resolve;
private int timeOut = 30;
private Date lastTime;

public CreateServer(Socket s, ExecutorService pool) {
    client = s;
    lastTime = new Date();
    resolve = new Resolve();
    try {
        client.setSoTimeout(30 * 60 * 1000);
        Server.ClientCount = Server.ClientCount + 1;
        in = new Scanner(client.getInputStream(), "GB2312");

        // in = new BufferedReader(new
        // InputStreamReader(client.getInputStream(), "GB2312"));
        out = new PrintWriter(client.getOutputStream(), true);
        out
                .println("--- Welcome To Universtar Science & Technology Softwear System ---");

    } catch (Exception ex) {
        Server.createMessage("Ex " + ex.getMessage());
    }
}

@Override
public Integer call() {
    String line = "";
    while ((line = in.next()) != null) {
        try {
            if (check(line)) {
                DataText dataText = resolve.getDataTextBySoketString(line);
                if (dataText != null) {
                    // Server.Data_Array.add(dataText);
                    resolve.saveDataRun(dataText);//业务代码
                } else {
                    Server.createMessage("Resolve error " + line);
                }
            } else {
                Server.createMessage("Check error " + line);
            }

        } catch (Exception ex) {
            Server.createMessage("Ex " + ex.getMessage());
            closeSocket(this.client );
        }
    }
    Server.ClientCount--;
    return Server.ClientCount;
}[/code]
  1. 静态变量,用于保存各个线程之间的数据。

[code="java"]import java.util.Calendar;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.szcust.einfo.receiveBiz.FactorBiz;
import com.szcust.einfo.receiveBiz.StationBiz;
import com.szcust.einfo.receiveEntity.einfoEntity.Station;
import com.szcust.einfo.receiveEntity.serverEntity.DataText;
import com.szcust.einfo.receiveEntity.configEntity.Config;
import com.szcust.einfo.receiveEntity.configEntity.ExLog;

public class Server{

public static  ConcurrentLinkedQueue<DataText> Data_Array = null;
public static Vector<Station> Client_Stations = null;
public static Vector<Station> Server_Stations = null;
private static ConcurrentLinkedQueue<String> Message = null;
public static ConcurrentLinkedQueue<String> ErrorText = null;
public static ConcurrentLinkedQueue<Future<Integer>> Results = null;

public static ExecutorService pool = Executors.newCachedThreadPool();
public static Config AppConfig = null;
public static int ClientCount = 0;
public static boolean IsClear = true;

public static void  init() {
    Data_Array =new ConcurrentLinkedQueue<DataText>();
    Client_Stations = new Vector<Station>();
    Server_Stations = new Vector<Station>();
    Message = new ConcurrentLinkedQueue<String>();
    ErrorText = new ConcurrentLinkedQueue<String>();
    Results = new ConcurrentLinkedQueue<Future<Integer>>();
    AppConfig = new Config();
    ClientCount = 0;
    IsClear = true;
    loadData();
}[/code]

5个回答

哦,我不是很清楚 MINA的实现,Netty里,是用线程池模型的,可以控制生成线程的上限。

Mina里如果找不到控制线程数的地方,我建议还是用Netty吧。
至少Netty算是公司在维护 :)

Blocking Socket不适合用线程池处理通讯。
单个线程会被挂起的。
用NIO才合适。

个人觉得没必要在accept后submit一个Callable

直接execute一个Runnable

同步好ClientCount就可以了吧

你的项目可以用外部库吗?

你可以看看Netty这个NIO库。(或者apache mina,netty是mina的创始者写的,我用起来蛮方便的。)

NIO是基于SELECT模型的。就是注册一批关注对象socketChannel,
然后重复调用Select方法,当SocketChannel处于指定状态,比如 有数据,可写
等状态,再拿出来处理。
这样,可以分成主线程持续select,select出来需要处理的socketchannel放到一个队列,由threadpool里分线程取出来 处理。

因为socketchannel操作不会阻塞,所以可以充分利用多线程。

而你的代码用的是普通socket,普通socket的特性就是在read时,如果没有数据,则会阻塞住。而如果想利用线程池,应当是read没有数据时,继续往下走,把当前线程资源还回线程池。

不用你主动结束那些线程,那些线程被应该会被设为 daemon=true,即主线程结束时自动结束。平时,等待 主监听线程传需要处理的任务进来,如果没有,则自己wait。

那些就是线程池里的线程。

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐