本文共 3421 字,大约阅读时间需要 11 分钟。
如果要处理一千条数据,你可能需要瞬间启动一千个线程。然而,需要仔细处理主线程与子线程之间的同步问题。而你的代码最大的问题在于,如何让主线程能够方便地获取子线程的处理结果。这种场景在实际项目中尤为重要。
ProcessNumTask[] tasks = new ProcessNumTask[tempList.size()]; for(int i=0; i ce = new ConcurrentExcutor(tasks, 5, result); ce.excute(); import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConcurrentExcutor { private Callable[] tasks; private int numb; private List result; public ConcurrentExcutor() {} public ConcurrentExcutor(Callable[] tasks, int numb) { super(); this.tasks = tasks; this.numb = numb; } public ConcurrentExcutor(Callable[] tasks, int numb, List result) { super(); this.tasks = tasks; this.numb = numb; this.result = result; } public void excute() { if(tasks == null || numb < 1) return; int num = tasks.length; if(num == 0) return; for(int i=0; i < (num/numb)+1; i++) { ExecutorService es = Executors.newCachedThreadPool(); Future[] futureArray = new Future[num]; for(int j=i*numb; j<(i+1)*numb; j++) { if(j+1 > num) break; futureArray[j%numb] = es.submit(tasks[j]); } if(result != null) { for(Future future : futureArray) { try { if(future != null) { T o = future.get(); result.add(o); } } catch(InterruptedException e) { System.out.println("处理Future时发生InterruptedException异常:" + future.toString()); e.printStackTrace(); } catch(ExecutionException e) { System.out.println("处理Future时发生ExecutionException异常:" + future.toString()); e.printStackTrace(); } } } es.shutdown(); } }} Callable和Future是Java在后续版本中为了适应多线程环境而引入的接口。Callable类似于Runnable,但它可以定义一个返回值的方法call(), 而Runnable只能执行无返回值的run()方法。此外,Callable的call()方法可以抛出异常,而Runnable则不行。
public interface Callable { T call() throws Exception; } Future表示异步计算的结果。它提供了检查任务是否完成的方法,以及等待任务完成并获取结果的方法。如果使用Future.cancel()方法,可以取消任务的执行。cancel()方法有一个布尔参数,参数为true表示立即中断任务的执行,参数为false表示允许正在运行的任务完成。
import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class Test { public static void main(String[] args) { try { List list = new ArrayList<>(); for(int i=0; i<100; i++) { list.add(i+","); } System.out.println(new Test().list2Str(list, 5)); } catch(Exception e) { e.printStackTrace(); } } public String list2Str(List list, final int nThreads) throws Exception { if(list == null || list.isEmpty()) return ""; StringBuffer ret = new StringBuffer(); ExecutorService executorService = Executors.newFixedThreadPool(nThreads); List> futures = new ArrayList<>(); for(int i=0; i subList = list.subList(i * list.size() / nThreads, (i+1) * list.size() / nThreads); Callable task = new Callable() { @Override public String call() throws Exception { StringBuffer sb = new StringBuffer(); for(String str : subList) { sb.append('[' + str + ']'); } return sb.toString(); } }; futures.add(executorService.submit(task)); } for(Future future : futures) { ret.append(future.get()); } executorService.shutdown(); return ret.toString(); } }} 转载地址:http://jhixz.baihongyu.com/