项目需求:客户端上传视频(*.avi),server将请求排队,然后一个一个排队处理转换任务.
前面问过该问题,现在自己实现,由于多线程不精通. 如下实现该排队需求,自己感觉不太准确,有问题,请指出,并给个解决方法.
(tip,没有servlet的单线程模式或将dopost方法synchronied,视乎这样做是由系统将请求排队,request请求不能立即得到回复! :) )
1.TaskBean 写了一个bean,这个bean用来存储了转换任务的属性.
[code="java"]package videoconvert;
public class TaskBean {
private String videoPath;
private String flvPath;
private boolean isConverting;
public String getVideoPath() {
return videoPath;
}
public void setVideoPath(String videoPath) {
this.videoPath = videoPath;
}
public String getFlvPath() {
return flvPath;
}
public void setFlvPath(String flvPath) {
this.flvPath = flvPath;
}
public boolean getIsConverting() {
return isConverting;
}
public void setIsConverting(boolean isConverting) {
this.isConverting = isConverting;
}
}[/code]
2.VideoConvert 这是个servlet,用来排队请求,并调度转换工具进行转换.
[code="java"]package videoconvert;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VideoConvert extends HttpServlet {
private static final long serialVersionUID = 6312986385178354217L;
private static Logger logger = LoggerFactory.getLogger(VideoConvert.class);
private static LinkedList taskStack = new LinkedList();
public VideoConvert() {
super();
}
@Override
public void init() throws ServletException {
// TODO Auto-generated method stub
super.init();
new Timer().schedule(new TimerTask() {
public void run() {
checkTaskList();[color=red]//这里按时间间隔检查栈.并取出task来执行.[/color]
}
}, 0, 10000);
}
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
doPost(request, response);
}
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
if (request.getRemoteAddr().equals("127.0.0.1")) {
TaskBean task = new TaskBean();
String videoPath = (String) request.getAttribute("VideoPath");
String flvPath = (String) request.getAttribute("FlvPath");
if (null != videoPath && null != flvPath) {
task.setVideoPath(videoPath);
task.setFlvPath(flvPath);
task.setIsConverting(false);
synchronized (taskStack) { [color=red] //多线程以taskStack栈作为同步量,当持有taskStack对象才添加任务进栈[/color]
taskStack.addLast(task);
logger.info("add task to queue!");
}
} else {
logger.info("the paramters are null!");
}
} else {
logger.info("visit is illegal!");
}
response.setContentType("text/html; charset=UTF-8");
PrintWriter pw = response.getWriter();
pw.write("上传视频成功,等待转换....")
pw.flush();
}
//这个是timer触发的任务,取出stack中的一个任务,然后检查是否在转换中,如果没有就调用工具进行转换.
private void checkTaskList() {
synchronized (taskStack) { [color=red]//这里感觉不准确,因为同步块里面,Timer新建的了一个线程p1来调用外
//部工具
// FFmpeg.并且会(process.waitFor();)等待FFmepg运行完成来返回结果. 所以在调
// 用外部工具的整个过程都占用了同步量(taskStack).所以应该如何改进...[/color]
TaskBean task = taskStack.peekFirst();
if (null != task) {
if (!task.getIsConverting()) {
task.setIsConverting(true);
executProcess(task);
}
}
}
}
private void executProcess(TaskBean task) {
MediaUtility mu = new MediaUtility(); //在类MediaUtility 中调用新建线程调用工具,并waitfor()结果
if (mu.video2Flv(task.getVideoPath(), task.getFlvPath())) {
taskStack.removeFirst();
logger.info("achieve video convert,removeFirst task!");
} else {
logger.error("video convert fail!");
taskStack.removeFirst();
}
}
}[/code]
3.调用工具的类 MediaUtility
[code="java"]package videoconvert;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MediaUtility {
private static Logger logger = LoggerFactory.getLogger(MediaUtility.class);
public boolean video2Flv(String videoPath, String flvPath) {
if (!checkfile(videoPath)) {
logger.error(videoPath + " is not file!");
return false;
}
String cmdStr = "ffmpeg -i " + videoPath + " " + flvPath;
logger.info(cmdStr);
logger.info("Starting convert video to flv....");
BufferedReader ffmpegOut = null;
try {
Process process = Runtime.getRuntime().exec(cmdStr);
// ffmpegOut = new BufferedReader(new InputStreamReader(process
// .getInputStream()));
ffmpegOut = new BufferedReader(new InputStreamReader(process
.getErrorStream()));
// FileWriter fileOut = new FileWriter(new File("c:/F.txt"));
String dLine = "";
while ((dLine = ffmpegOut.readLine()) != null) {
logger.info(dLine);
// fileOut.write(dLine);
}
// fileOut.close();
process.waitFor();
logger.info("***************************** end convert video *************************");
ffmpegOut.close();
return true;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
} finally {
try {
if (null != ffmpegOut) {
ffmpegOut.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static boolean checkfile(String path) {
File file = new File(path);
if (!file.isFile()) {
return false;
}
return true;
}
// public static void main(String[] args) {
// boolean bb = video2Flv("d:/ffmpeg/uploadVideo/Video1.wmv",
// "d:/ffmpeg/convertedVideo/pp1.flv");
// logger.info("the bb result is " + bb);
// boolean cc = video2Flv("d:/ffmpeg/uploadVideo/Video2.wmv",
// "d:/ffmpeg/convertedVideo/pp2.flv");
// logger.info("the cc result is " + cc);
// }[/code]
如上面的代码.上面的代码每次只能有一个视频在转换,如果想改进为最多同时有3个或者6个任务在执行呢? 此时是否要再设置一个信号量呢?
[b]问题补充:[/b]
[quote]放一个线程池(ThreadPool),可以解决你的问题,每次把你的任务丢到线程池里面。[/quote]
只知道连接数据库用到线程池.请求排队也可以用线程池?有这样的应用吗? 8)
并且具体如何用,能给个例子吗?
[b]问题补充:[/b]
[quote]jdk的cocurrent里面有现成的。
Java代码
queue = new LinkedBlockingQueue(1000);
threadPool = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, queue);
这样就定义了一个线程池。
Java代码
threadPool.execute(runnable);
这样就放入线程池一个Runnable了,就可以满足你的需求了。[/quote]
感谢taopian的热心回复,马上看jdk的cocurrent. :)
[b]问题补充:[/b]
[quote]jdk的cocurrent里面有现成的。 [/quote]
谢谢了,cocurrent包,感觉很好,应该就是解决问题的办法了. :idea:
另外,我还有一个不好很明白,就是
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)
构造函数中的 BlockingQueue workQueue的作用? 感觉JVM可以在内部维护这个队列,不用释放出接口来.
当然尽管API上讲了" 大队列 用小池, 小队列 用大池," 可以提高资源利用率.然而现在还不是很理解.后面在实践中体会了. 嘿嘿
[b]问题补充:[/b]
先问再吃饭. :D
[quote]BlockingQueue workQueue
这个就是放一个队列,任务扔到队列里面,可以对并发做控制。
把这个队列暴露出来,能够让你进行很好的控制,并且你可以继承BlockingQueue,来加入你自己需要的功能,:)。[/quote]
嗯,是的,可以自己继承BlockingQueue,再增加一些自己的功能,然后放入executor中去. 面向接口 :)
现在我已经可以顺利,并准确的运行了. 最后一个疑问:
executor.excute(new Runnable(){
public void run(){
run 中的变量要求是final的.final变量的GC时间是否会有影响呢? 或者有别的更好解决方法
}
}
)
我是这样的改的: 这个bean被迫为final的了.呵呵,有更好的方法吗?
[code="java"]if (null != videoPath && null != flvPath) {
final TaskBean task = new TaskBean();
task.setVideoPath(videoPath);
task.setFlvPath(flvPath);
threadPool.execute(new Runnable(){
public void run(){
MediaUtility.video2Flv(task.getVideoPath(), task.getFlvPath());
}
});[/code]
哎呀,罗嗦了,吃饭先..... :D