accprenhailin 2012-05-08 09:54
浏览 420
已采纳

线程池实现socket 通信问题,不能长时间运行。

1.创建一个监听

[code="java"]import java.io.IOException;
import java.net.*;
import java.util.concurrent.Future;

public class Listener extends ServerSocket implements Runnable {

public Listener() throws IOException {
    super(Server.AppConfig.getLocalhost().getListenport());
}

@Override
public void run() {
    while (true) {
        try {
            Socket socket = accept();
            CreateServer server = new CreateServer(socket, Server.pool);
            Future<Integer> result = Server.pool.submit(server);                
            Server.Results.add(result);
        } catch (Exception e) {
            Server.createMessage("Listener:"+e.getMessage());
        } finally {
        }
    }

}

}[/code]

  1. 创建一个解析socket的 服务

[code="java"]import java.net.Socket;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.io.*;

import com.szcust.einfo.receiveEntity.serverEntity.DataText;

public class CreateServer implements Callable {
private Socket client;
private Scanner in;
private PrintWriter out;
private Resolve resolve;
private int timeOut = 30;
private Date lastTime;

public CreateServer(Socket s, ExecutorService pool) {
    client = s;
    lastTime = new Date();
    resolve = new Resolve();
    try {
        client.setSoTimeout(30 * 60 * 1000);
        Server.ClientCount = Server.ClientCount + 1;
        in = new Scanner(client.getInputStream(), "GB2312");

        // in = new BufferedReader(new
        // InputStreamReader(client.getInputStream(), "GB2312"));
        out = new PrintWriter(client.getOutputStream(), true);
        out
                .println("--- Welcome To Universtar Science & Technology Softwear System ---");

    } catch (Exception ex) {
        Server.createMessage("Ex " + ex.getMessage());
    }
}

@Override
public Integer call() {
    String line = "";
    while ((line = in.next()) != null) {
        try {
            if (check(line)) {
                DataText dataText = resolve.getDataTextBySoketString(line);
                if (dataText != null) {
                    // Server.Data_Array.add(dataText);
                    resolve.saveDataRun(dataText);//业务代码
                } else {
                    Server.createMessage("Resolve error " + line);
                }
            } else {
                Server.createMessage("Check error " + line);
            }

        } catch (Exception ex) {
            Server.createMessage("Ex " + ex.getMessage());
            closeSocket(this.client );
        }
    }
    Server.ClientCount--;
    return Server.ClientCount;
}[/code]
  1. 静态变量,用于保存各个线程之间的数据。

[code="java"]import java.util.Calendar;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.szcust.einfo.receiveBiz.FactorBiz;
import com.szcust.einfo.receiveBiz.StationBiz;
import com.szcust.einfo.receiveEntity.einfoEntity.Station;
import com.szcust.einfo.receiveEntity.serverEntity.DataText;
import com.szcust.einfo.receiveEntity.configEntity.Config;
import com.szcust.einfo.receiveEntity.configEntity.ExLog;

public class Server{

public static  ConcurrentLinkedQueue<DataText> Data_Array = null;
public static Vector<Station> Client_Stations = null;
public static Vector<Station> Server_Stations = null;
private static ConcurrentLinkedQueue<String> Message = null;
public static ConcurrentLinkedQueue<String> ErrorText = null;
public static ConcurrentLinkedQueue<Future<Integer>> Results = null;

public static ExecutorService pool = Executors.newCachedThreadPool();
public static Config AppConfig = null;
public static int ClientCount = 0;
public static boolean IsClear = true;

public static void  init() {
    Data_Array =new ConcurrentLinkedQueue<DataText>();
    Client_Stations = new Vector<Station>();
    Server_Stations = new Vector<Station>();
    Message = new ConcurrentLinkedQueue<String>();
    ErrorText = new ConcurrentLinkedQueue<String>();
    Results = new ConcurrentLinkedQueue<Future<Integer>>();
    AppConfig = new Config();
    ClientCount = 0;
    IsClear = true;
    loadData();
}[/code]

展开全部

  • 写回答

5条回答 默认 最新

  • iteye_7589 2012-05-16 03:28
    关注

    哦,我不是很清楚 MINA的实现,Netty里,是用线程池模型的,可以控制生成线程的上限。

    Mina里如果找不到控制线程数的地方,我建议还是用Netty吧。
    至少Netty算是公司在维护 :)

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(4条)
编辑
预览

报告相同问题?

悬赏问题

  • ¥15 关于#c##的问题:treenode反序列化后获取不到上一节点和下一节点,Fullpath和Handle报错
  • ¥15 一部手机能否同时用不同的app进入不同的直播间?
  • ¥15 没输出运行不了什么问题
  • ¥20 输入import torch显示Intel MKL FATAL ERROR,系统驱动1%,: Cannot load mkl_intel_thread.dll.
  • ¥15 点云密度大则包围盒小
  • ¥15 nginx使用nfs进行服务器的数据共享
  • ¥15 C#i编程中so-ir-192编码的字符集转码UTF8问题
  • ¥15 51嵌入式入门按键小项目
  • ¥30 海外项目,如何降低Google Map接口费用?
  • ¥15 fluentmeshing