Servlet实现排队处理视频转换请求中的疑问,贴代码请斧正!

项目需求:客户端上传视频(*.avi),server将请求排队,然后一个一个排队处理转换任务.
前面问过该问题,现在自己实现,由于多线程不精通. 如下实现该排队需求,自己感觉不太准确,有问题,请指出,并给个解决方法.
(tip,没有servlet的单线程模式或将dopost方法synchronied,视乎这样做是由系统将请求排队,request请求不能立即得到回复! :) )
1.TaskBean 写了一个bean,这个bean用来存储了转换任务的属性.
[code="java"]package videoconvert;
public class TaskBean {
private String videoPath;
private String flvPath;
private boolean isConverting;
public String getVideoPath() {
return videoPath;
}
public void setVideoPath(String videoPath) {
this.videoPath = videoPath;
}
public String getFlvPath() {
return flvPath;
}
public void setFlvPath(String flvPath) {
this.flvPath = flvPath;
}
public boolean getIsConverting() {
return isConverting;
}
public void setIsConverting(boolean isConverting) {
this.isConverting = isConverting;
}
}[/code]

2.VideoConvert 这是个servlet,用来排队请求,并调度转换工具进行转换.
  [code="java"]package videoconvert;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VideoConvert extends HttpServlet {
private static final long serialVersionUID = 6312986385178354217L;
private static Logger logger = LoggerFactory.getLogger(VideoConvert.class);
private static LinkedList taskStack = new LinkedList();

public VideoConvert() {
    super();
}

@Override
public void init() throws ServletException {
    // TODO Auto-generated method stub
    super.init();
    new Timer().schedule(new TimerTask() {
        public void run() {
            checkTaskList();[color=red]//这里按时间间隔检查栈.并取出task来执行.[/color]
        }
    }, 0, 10000);
}


protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
    doPost(request, response);
}


protected void doPost(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
    if (request.getRemoteAddr().equals("127.0.0.1")) {
        TaskBean task = new TaskBean();
        String videoPath = (String) request.getAttribute("VideoPath");
        String flvPath = (String) request.getAttribute("FlvPath");
        if (null != videoPath && null != flvPath) {
            task.setVideoPath(videoPath);
            task.setFlvPath(flvPath);
            task.setIsConverting(false);
            synchronized (taskStack) {       [color=red] //多线程以taskStack栈作为同步量,当持有taskStack对象才添加任务进栈[/color]
                taskStack.addLast(task);
                logger.info("add task to queue!");
            }
        } else {
            logger.info("the paramters are null!");
        }
    } else {
        logger.info("visit is illegal!");
    }

    response.setContentType("text/html; charset=UTF-8");
    PrintWriter pw = response.getWriter();
    pw.write("上传视频成功,等待转换....")
    pw.flush();
}

//这个是timer触发的任务,取出stack中的一个任务,然后检查是否在转换中,如果没有就调用工具进行转换.
private void checkTaskList() {
synchronized (taskStack) { [color=red]//这里感觉不准确,因为同步块里面,Timer新建的了一个线程p1来调用外
//部工具
// FFmpeg.并且会(process.waitFor();)等待FFmepg运行完成来返回结果. 所以在调
// 用外部工具的整个过程都占用了同步量(taskStack).所以应该如何改进...[/color]
TaskBean task = taskStack.peekFirst();
if (null != task) {
if (!task.getIsConverting()) {
task.setIsConverting(true);
executProcess(task);
}
}
}
}

private void executProcess(TaskBean task) {
    MediaUtility mu = new MediaUtility(); //在类MediaUtility 中调用新建线程调用工具,并waitfor()结果
    if (mu.video2Flv(task.getVideoPath(), task.getFlvPath())) {
            taskStack.removeFirst();
            logger.info("achieve video convert,removeFirst task!");
    } else {
        logger.error("video convert fail!");
        taskStack.removeFirst();
    }

}

}[/code]

3.调用工具的类 MediaUtility
[code="java"]package videoconvert;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MediaUtility {

private static Logger logger = LoggerFactory.getLogger(MediaUtility.class);

public  boolean video2Flv(String videoPath, String flvPath) {
    if (!checkfile(videoPath)) {
        logger.error(videoPath + " is not file!");
        return false;
    }
    String cmdStr = "ffmpeg -i " + videoPath + " " + flvPath;
    logger.info(cmdStr);
    logger.info("Starting convert video to flv....");
    BufferedReader ffmpegOut = null;
    try {
        Process process = Runtime.getRuntime().exec(cmdStr);
        // ffmpegOut = new BufferedReader(new InputStreamReader(process
        // .getInputStream()));
        ffmpegOut = new BufferedReader(new InputStreamReader(process
                .getErrorStream()));
        // FileWriter fileOut = new FileWriter(new File("c:/F.txt"));
        String dLine = "";
        while ((dLine = ffmpegOut.readLine()) != null) {
            logger.info(dLine);
            // fileOut.write(dLine);
        }
        // fileOut.close();
        process.waitFor();
        logger.info("***************************** end convert video *************************");
        ffmpegOut.close();
        return true;
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return false;
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return false;
    } finally {
        try {
            if (null != ffmpegOut) {
                ffmpegOut.close();
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public static boolean checkfile(String path) {
    File file = new File(path);
    if (!file.isFile()) {
        return false;
    }
    return true;
}

// public static void main(String[] args) {
// boolean bb = video2Flv("d:/ffmpeg/uploadVideo/Video1.wmv",
// "d:/ffmpeg/convertedVideo/pp1.flv");
// logger.info("the bb result is " + bb);
// boolean cc = video2Flv("d:/ffmpeg/uploadVideo/Video2.wmv",
// "d:/ffmpeg/convertedVideo/pp2.flv");
// logger.info("the cc result is " + cc);
//  }[/code]


如上面的代码.上面的代码每次只能有一个视频在转换,如果想改进为最多同时有3个或者6个任务在执行呢? 此时是否要再设置一个信号量呢?

[b]问题补充:[/b]
[quote]放一个线程池(ThreadPool),可以解决你的问题,每次把你的任务丢到线程池里面。[/quote]
只知道连接数据库用到线程池.请求排队也可以用线程池?有这样的应用吗? 8)
并且具体如何用,能给个例子吗?
[b]问题补充:[/b]
[quote]jdk的cocurrent里面有现成的。

Java代码
queue = new LinkedBlockingQueue(1000);

threadPool = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, queue);

这样就定义了一个线程池。

Java代码
threadPool.execute(runnable);

这样就放入线程池一个Runnable了,就可以满足你的需求了。[/quote]

感谢taopian的热心回复,马上看jdk的cocurrent. :)
[b]问题补充:[/b]
[quote]jdk的cocurrent里面有现成的。 [/quote]
谢谢了,cocurrent包,感觉很好,应该就是解决问题的办法了. :idea:

另外,我还有一个不好很明白,就是
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)
构造函数中的 BlockingQueue workQueue的作用? 感觉JVM可以在内部维护这个队列,不用释放出接口来.
当然尽管API上讲了" 大队列 用小池, 小队列 用大池," 可以提高资源利用率.然而现在还不是很理解.后面在实践中体会了. 嘿嘿

[b]问题补充:[/b]
先问再吃饭. :D
[quote]BlockingQueue workQueue

这个就是放一个队列,任务扔到队列里面,可以对并发做控制。

把这个队列暴露出来,能够让你进行很好的控制,并且你可以继承BlockingQueue,来加入你自己需要的功能,:)。[/quote]
嗯,是的,可以自己继承BlockingQueue,再增加一些自己的功能,然后放入executor中去. 面向接口 :)
现在我已经可以顺利,并准确的运行了. 最后一个疑问:
executor.excute(new Runnable(){
public void run(){
run 中的变量要求是final的.final变量的GC时间是否会有影响呢? 或者有别的更好解决方法
}
}
)

我是这样的改的: 这个bean被迫为final的了.呵呵,有更好的方法吗?
[code="java"]if (null != videoPath && null != flvPath) {
final TaskBean task = new TaskBean();
task.setVideoPath(videoPath);
task.setFlvPath(flvPath);
threadPool.execute(new Runnable(){
public void run(){
MediaUtility.video2Flv(task.getVideoPath(), task.getFlvPath());
}
});[/code]

哎呀,罗嗦了,吃饭先..... :D

4个回答

放一个线程池(ThreadPool),可以解决你的问题,每次把你的任务丢到线程池里面。

jdk的cocurrent里面有现成的。

[code="java"]
queue = new LinkedBlockingQueue(1000);
threadPool = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, queue);
[/code]

这样就定义了一个线程池。

[code="java"]
threadPool.execute(runnable);
[/code]

这样就放入线程池一个Runnable了,就可以满足你的需求了。

用队列不就可以了吗?

[code="java"]
BlockingQueue workQueue
[/code]

这个就是放一个队列,任务扔到队列里面,可以对并发做控制。

把这个队列暴露出来,能够让你进行很好的控制,并且你可以继承BlockingQueue,来加入你自己需要的功能,:)。

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问