给定一个int数组,假设有10000个长度,里面放满1-100的随机整数。需要用串行循环计算、Executors框架和Fork-Join框架三种方法,实现查找并输出该数组中50的出现个数。
预期执行结果如下(具体数量根据每个程序随机赋值决定)
串行搜索得到50的个数是5个。
Executors搜索得到50的个数是5个。
Fork-Join搜索得到50的个数是5个。
给定一个int数组,假设有10000个长度,里面放满1-100的随机整数。需要用串行循环计算、Executors框架和Fork-Join框架三种方法,实现查找并输出该数组中50的出现个数。
预期执行结果如下(具体数量根据每个程序随机赋值决定)
串行搜索得到50的个数是5个。
Executors搜索得到50的个数是5个。
Fork-Join搜索得到50的个数是5个。
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class FindAndCount {
private static int length = 10000;
static int[] array = new int[length];
private static void init() {
Random random = new Random();
for (int i = 0; i < length; i++) {
array[i] = random.nextInt(100) + 1;
}
}
/**
* 串行统计
* @param target -- 统计目标值
* @return - 数量
*/
private static int serial(int target) {
int count = 0;
for (int i = 0; i < length; i++) {
if (array[i] == target) {
count++;
}
}
return count;
}
/**
* 线程池并行统计
* @param target -- 统计目标值
* @return - 数量
*/
private static int executor(int target) {
//并行10个线程统计
int threads = 10;
int size = length / threads;
CountDownLatch countDownLatch = new CountDownLatch(threads);
AtomicInteger count = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(threads);
while (threads > 0) {
executorService.execute(new RangeRunnable(size * (threads - 1), size * threads--) {
@Override
public void run() {
for (int i = startIndex; i < endIndex; i++) {
if (array[i] == target) {
count.getAndIncrement();
}
}
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
executorService.shutdown();
return count.get();
}
/**
* 工作窃取算法并行统计
* @param target -- 统计目标值
* @return - 数量
*/
private static int forkJoin(int target) {
//不使用自定义pool
//RangeTask rangeTask = new RangeTask(0, length, target);
//rangeTask.fork();
//return rangeTask.join();
//自己设置forkJoin线程池进行计算
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(new RangeTask(0, length, target));
try {
return forkJoinTask.get();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
forkJoinPool.shutdown();
}
}
/**
* 工作窃取算法并行统计
* java8提供的方法非常简洁
* @param target -- 统计目标值
* @return - 数量
*/
private static int forkJoinByStream(int target) {
//java8的stream
return (int)Arrays.stream(array).parallel().filter(o -> o == target).count();
}
public static void main(String[] args) {
//初始化数据
init();
//目标50
int target = 50;
System.out.println(serial(target));
System.out.println(executor(target));
System.out.println(forkJoinByStream(target));
System.out.println(forkJoin(target));
}
}
abstract class RangeRunnable implements Runnable {
final int startIndex;
final int endIndex;
RangeRunnable(int startIndex, int endIndex) {
this.startIndex = startIndex;
this.endIndex = endIndex;
}
}
class RangeTask extends RecursiveTask<Integer> {
private final int startIndex;
private final int endIndex;
private int target;
RangeTask(int startIndex, int endIndex, int target) {
this.startIndex = startIndex;
this.endIndex = endIndex;
this.target = target;
}
@Override
protected Integer compute() {
int count = 0;
if (endIndex - startIndex < 1000) {
for (int i = startIndex; i < endIndex; i++) {
if (FindAndCount.array[i] == target) {
count++;
}
}
return count;
} else {
int middle = (endIndex + startIndex) / 2;
RangeTask left = new RangeTask(startIndex, middle, target);
RangeTask right = new RangeTask(middle, endIndex, target);
left.fork();
right.fork();
return left.join() + right.join();
}
}
}
希望对你有帮助...