accprenhailin 2012-05-08 17:54
浏览 420
已采纳

线程池实现socket 通信问题,不能长时间运行。

1.创建一个监听

[code="java"]import java.io.IOException;
import java.net.*;
import java.util.concurrent.Future;

public class Listener extends ServerSocket implements Runnable {

public Listener() throws IOException {
    super(Server.AppConfig.getLocalhost().getListenport());
}

@Override
public void run() {
    while (true) {
        try {
            Socket socket = accept();
            CreateServer server = new CreateServer(socket, Server.pool);
            Future<Integer> result = Server.pool.submit(server);                
            Server.Results.add(result);
        } catch (Exception e) {
            Server.createMessage("Listener:"+e.getMessage());
        } finally {
        }
    }

}

}[/code]

  1. 创建一个解析socket的 服务

[code="java"]import java.net.Socket;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.io.*;

import com.szcust.einfo.receiveEntity.serverEntity.DataText;

public class CreateServer implements Callable {
private Socket client;
private Scanner in;
private PrintWriter out;
private Resolve resolve;
private int timeOut = 30;
private Date lastTime;

public CreateServer(Socket s, ExecutorService pool) {
    client = s;
    lastTime = new Date();
    resolve = new Resolve();
    try {
        client.setSoTimeout(30 * 60 * 1000);
        Server.ClientCount = Server.ClientCount + 1;
        in = new Scanner(client.getInputStream(), "GB2312");

        // in = new BufferedReader(new
        // InputStreamReader(client.getInputStream(), "GB2312"));
        out = new PrintWriter(client.getOutputStream(), true);
        out
                .println("--- Welcome To Universtar Science & Technology Softwear System ---");

    } catch (Exception ex) {
        Server.createMessage("Ex " + ex.getMessage());
    }
}

@Override
public Integer call() {
    String line = "";
    while ((line = in.next()) != null) {
        try {
            if (check(line)) {
                DataText dataText = resolve.getDataTextBySoketString(line);
                if (dataText != null) {
                    // Server.Data_Array.add(dataText);
                    resolve.saveDataRun(dataText);//业务代码
                } else {
                    Server.createMessage("Resolve error " + line);
                }
            } else {
                Server.createMessage("Check error " + line);
            }

        } catch (Exception ex) {
            Server.createMessage("Ex " + ex.getMessage());
            closeSocket(this.client );
        }
    }
    Server.ClientCount--;
    return Server.ClientCount;
}[/code]
  1. 静态变量,用于保存各个线程之间的数据。

[code="java"]import java.util.Calendar;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.szcust.einfo.receiveBiz.FactorBiz;
import com.szcust.einfo.receiveBiz.StationBiz;
import com.szcust.einfo.receiveEntity.einfoEntity.Station;
import com.szcust.einfo.receiveEntity.serverEntity.DataText;
import com.szcust.einfo.receiveEntity.configEntity.Config;
import com.szcust.einfo.receiveEntity.configEntity.ExLog;

public class Server{

public static  ConcurrentLinkedQueue<DataText> Data_Array = null;
public static Vector<Station> Client_Stations = null;
public static Vector<Station> Server_Stations = null;
private static ConcurrentLinkedQueue<String> Message = null;
public static ConcurrentLinkedQueue<String> ErrorText = null;
public static ConcurrentLinkedQueue<Future<Integer>> Results = null;

public static ExecutorService pool = Executors.newCachedThreadPool();
public static Config AppConfig = null;
public static int ClientCount = 0;
public static boolean IsClear = true;

public static void  init() {
    Data_Array =new ConcurrentLinkedQueue<DataText>();
    Client_Stations = new Vector<Station>();
    Server_Stations = new Vector<Station>();
    Message = new ConcurrentLinkedQueue<String>();
    ErrorText = new ConcurrentLinkedQueue<String>();
    Results = new ConcurrentLinkedQueue<Future<Integer>>();
    AppConfig = new Config();
    ClientCount = 0;
    IsClear = true;
    loadData();
}[/code]
  • 写回答

5条回答

  • iteye_7589 2012-05-16 11:28
    关注

    哦,我不是很清楚 MINA的实现,Netty里,是用线程池模型的,可以控制生成线程的上限。

    Mina里如果找不到控制线程数的地方,我建议还是用Netty吧。
    至少Netty算是公司在维护 :)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(4条)

报告相同问题?

悬赏问题

  • ¥15 如何在scanpy上做差异基因和通路富集?
  • ¥20 关于#硬件工程#的问题,请各位专家解答!
  • ¥15 关于#matlab#的问题:期望的系统闭环传递函数为G(s)=wn^2/s^2+2¢wn+wn^2阻尼系数¢=0.707,使系统具有较小的超调量
  • ¥15 FLUENT如何实现在堆积颗粒的上表面加载高斯热源
  • ¥30 截图中的mathematics程序转换成matlab
  • ¥15 动力学代码报错,维度不匹配
  • ¥15 Power query添加列问题
  • ¥50 Kubernetes&Fission&Eleasticsearch
  • ¥15 報錯:Person is not mapped,如何解決?
  • ¥15 c++头文件不能识别CDialog