当前位置:网站首页>Future & CompletionService
Future & CompletionService
2022-06-27 10:24:00 【InfoQ】
How to create threads
- Inherit Thread class
- Realization Runable Interface
- Realization Callable Interface
- Using thread pools
- Cannot return a return value
- Can't throw checked Exception
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
new Thread(() -> {
System.out.println(" adopt Runnable Way to perform tasks ");
}).start();
FutureTask task = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
System.out.println(" adopt Callable Way to perform tasks ");
// wait for 3s Simulation execution task
Thread.sleep(3000);
return " Return to task result ";
}
});
new Thread(task).start();
System.out.println(" result :" + task.get());
}
}
Future
getConstruction method
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
Common methods
boolean cancel (boolean mayInterruptIfRunning)
- Cancel the execution of the task . Parameter specifies whether to interrupt task execution immediately , Or wait until the mission is over .
boolean isCancelled ()
- Whether the task has been cancelled , Cancel the task before it is completed , Then return to true.
boolean isDone ()
- Whether the task has been completed . It should be noted that if the task terminates normally 、 Exception or cancel , Will return to true.
V get () throws InterruptedException, ExecutionException
- Wait for the task to finish , Then get V Result of type .InterruptedException Thread interrupted exception ,ExecutionException Task execution exception , If the mission is cancelled , And throw CancellationException .
V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException, TimeoutException
- Same as above get Function as , Too much time out . Parameters timeout Specify the timeout period ,uint Specify the unit of time , In enumerating classes TimeUnit There are related definitions in . If the calculation times out , Will throw out TimeoutException.
FutureTask

Usage mode
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// Realization Callable
Task task = new Task();
// structure futureTask
FutureTask<Integer> futureTask = new FutureTask<>(task);
// As Runnable Enter the reference
new Thread(futureTask).start();
System.out.println("task Running results :" + futureTask.get());
}
static class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(" The child thread is calculating ");
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
}
Use scenarios

public class FutureTaskDemo2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft1 = new FutureTask<>(new T1Task());
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
FutureTask<String> ft3 = new FutureTask<>(new T3Task());
FutureTask<String> ft4 = new FutureTask<>(new T4Task());
FutureTask<String> ft5 = new FutureTask<>(new T5Task());
// Build thread pool
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(ft1);
executorService.submit(ft2);
executorService.submit(ft3);
executorService.submit(ft4);
executorService.submit(ft5);
// Get execution results
System.out.println(ft1.get());
System.out.println(ft2.get());
System.out.println(ft3.get());
System.out.println(ft4.get());
System.out.println(ft5.get());
executorService.shutdown();
}
static class T1Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T1: Query flight information ...");
TimeUnit.MILLISECONDS.sleep(5000);
return " Flight information query succeeded ";
}
}
static class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2: Check the information of the passengers ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Flight attendant information query succeeded ";
}
}
static class T3Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T3: Query contact information ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Contact information query succeeded ";
}
}
static class T4Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T4: Query the refund and change information ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Refund and change information query succeeded ";
}
}
static class T5Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T5: Query other information ...");
TimeUnit.MILLISECONDS.sleep(50);
return " Other information query succeeded ";
}
}
}

Future The limitations of
get()- Multitasking concurrently
- Future Provided only get() Method to get the result , And it's blocked . therefore , There is no other way but to wait for you .
- Cannot chain call multiple tasks
- If you want to perform a specific action after the calculation task is completed , E-mail , but Future But it doesn't provide such ability .
- Cannot combine multiple tasks
- If you run 10 A mission , And expect to perform specific actions after they are all executed , So in Future There's nothing I can do about it .
- No exception handling
- Future There are no exception handling methods in the interface .
CompletionService
Construction method
ExecutorCompletionService(Executor executor)
ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
Common methods

submit
- To submit a Callable perhaps Runnable Type of task , And back to Future.
take
- Blocking method , Get and remove the result of a completed task from the result queue , If not, it will block , Until a task is completed, the result is returned .
poll
- Get and remove the result of a completed task from the result queue , If not, it will return null, This method doesn't block .
Use scenarios
public class CompletionServiceDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Creating a thread pool
ExecutorService executor = Executors.newFixedThreadPool(10);
// establish CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// Asynchronous to e-commerce S1 inquiry
cs.submit(() -> getPriceByS1());
// Asynchronous to e-commerce S2 inquiry
cs.submit(() -> getPriceByS2());
// Asynchronous to e-commerce S3 inquiry
cs.submit(() -> getPriceByS3());
// Asynchronously save the inquiry result to the database
for (int i = 0; i < 3; i++) {
// Get from blocking queue futureTask
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
executor.shutdown();
}
private static void save(Integer r) {
System.out.println(" Save inquiry results :" + r);
}
private static Integer getPriceByS1() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(5000);
System.out.println(" Online retailers S1 Inquiry information 1200");
return 1200;
}
private static Integer getPriceByS2() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(8000);
System.out.println(" Online retailers S2 Inquiry information 1000");
return 1000;
}
private static Integer getPriceByS3() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(3000);
System.out.println(" Online retailers S3 Inquiry information 800");
return 800;
}
}

Realization principle
QueueingFuture
QueueingFutureprivate static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
// rewrite FutureTask Of done Method , After the Executor The result of the calculation is put into BlockingQueue in
protected void done() {
completionQueue.add(task);
}
}
ExecutorService and CompletionService
Futureget()get()FutureFutureFuture
Performance optimization practice
- Isolate the problems that need to be solved (60~120 individual , The average is about 80 About two topics );
- Analyze and deal with the problem , Download the pictures in the title to the local , Then call the third party tool generation. PDF file ( It takes about 3~10 second );
- take PDF Upload document to OSS Cloud space for storage ( It takes about 1~3 second );
- Provide the document address for users to download and print .
4~13s8sfor/**
* Pending document information as needed
*/
public class PDFDocVO {
// Name of document to be processed
private String docName;
// Omit others
public String getDocName() {
return docName;
}
public void setDocName(String docName) {
this.docName = docName;
}
}
public class ProduceDocService {
/**
* Process the pending document as a local actual document
*
* @param doc
* @return
*/
public static String makePDF(PDFDocVO doc) throws InterruptedException {
// use sleep Time consuming range of simulated document generation 3~10s
Random r = new Random();
Thread.sleep(3000 + r.nextInt(7000));
return "local" + doc.getDocName();
}
}
public class UploadDocService {
/**
* Analog upload
*
* @param localName Where the actual document is stored locally
* @return oss File path
* @throws InterruptedException
*/
public static String upload(String localName) throws InterruptedException {
// use sleep Time consuming range of simulated document generation 1~3s
Random r = new Random();
Thread.sleep(1000 + r.nextInt(2000));
return "https://aliyun.oss.xxx/file/" + localName;
}
}
Serial
import java.util.ArrayList;
import java.util.List;
public class SerializeModel {
/**
* Eligible documents
*
* @param count
* @return
*/
public static List<PDFDocVO> getPDFDocList(int count) {
List<PDFDocVO> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
PDFDocVO doc1 = new PDFDocVO();
list.add(doc1);
}
return list;
}
public static void main(String[] args) throws InterruptedException {
List<PDFDocVO> docList = getPDFDocList(10);
// Starting time
long start = System.currentTimeMillis();
for (PDFDocVO doc : docList) {
// Generating documentation
String localName = ProduceDocService.makePDF(doc);
// Upload documents
UploadDocService.upload(localName);
}
long total = System.currentTimeMillis() - start;
System.out.println(" The total time is :" + total / 1000 + " second ," + total / 1000 / 60 + " minute ");
}
}

82/10=8.2sparallel
parallel + asynchronous
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* TODO
*
* @date 2022/4/7 22:08
*/
public class ParallelAsyncModel {
// Thread pool threads
public final static int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
// Thread pool for processing document generation IO Intensive task
private static ExecutorService makeDocService = Executors.newFixedThreadPool(THREAD_COUNT * 2);
// Thread pool for processing document upload
private static ExecutorService uploadDocService = Executors.newFixedThreadPool(THREAD_COUNT * 2);
// Document generation queue
private static CompletionService<String> makeDocCompletionService = new ExecutorCompletionService(makeDocService);
// Document upload queue
private static CompletionService<String> uploadDocCompletionService = new ExecutorCompletionService(uploadDocService);
/**
* Eligible documents
*
* @param count
* @return
*/
public static List<PDFDocVO> getPDFDocList(int count) {
List<PDFDocVO> list = new ArrayList<>();
for (int i = 0; i < count; i++) {
PDFDocVO doc1 = new PDFDocVO();
list.add(doc1);
}
return list;
}
public static void main(String[] args) throws InterruptedException {
int count = 100;
List<PDFDocVO> docList = getPDFDocList(count);
// Starting time
long start = System.currentTimeMillis();
// Multithreading document generation
for (PDFDocVO doc : docList) {
makeDocCompletionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String localName = ProduceDocService.makePDF(doc);
return localName;
}
});
}
// Upload documents
for (int i = 0; i < count; i++) {
Future<String> take = makeDocCompletionService.take();
uploadDocCompletionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String uploadUrl = UploadDocService.upload(take.get());
return uploadUrl;
}
});
}
long total = System.currentTimeMillis() - start;
System.out.println(" The total time is :" + total / 1000 + " second ," + total / 1000 / 60 + " minute ");
}
}

32/100=0.32s8.2/0.32 ≈ 25 times Thread number setting
- IO Intensive task , The number of threads is
CPU The core number *2;
- CPU Intensive task , The number of threads is
CPU The core number +1.
CPU The core number *2CPU The core number *41:41:3CPU The core number *4*3
summary
边栏推荐
- 通俗易懂理解樸素貝葉斯分類的拉普拉斯平滑
- JS file upload and download
- 【TcaplusDB知识库】Tmonitor单机安装指引介绍(一)
- C language learning day_ 06
- Advantages and disadvantages of distributed file storage system
- 别再用 System.currentTimeMillis() 统计耗时了,太 Low,StopWatch 好用到爆!
- Review of last week's hot spots (6.20-6.26)
- Ubuntu manually installing MySQL
- R language plot visualization: visualize the normalized histograms of multiple data sets, add density curve KDE to the histograms, set different histograms to use different bin sizes, and add edge whi
- 【TcaplusDB知识库】TcaplusDB机型管理介绍
猜你喜欢

2-4Kali下安装nessus

CPU design (single cycle and pipeline)

浅析基于边缘计算的移动AR实现(中)

【TcaplusDB知识库】TcaplusDB Tmonitor模块架构介绍

When does the mobile phone video roll off?

邮件系统(基于SMTP协议和POP3协议-C语言实现)
![[hcie-rs review mind map] - STP](/img/b5/b89e59fe7f23bf23feeadb991acba7.png)
[hcie-rs review mind map] - STP

细说物体检测中的Anchors

Comparison between new and old interfaces

The tutor invites you to continue your doctoral study with him. Will you agree immediately?
随机推荐
Privacy computing fat offline prediction
【TcaplusDB知识库】TcaplusDB机器初始化和上架介绍
Support system of softswitch call center system
CPU设计(单周期和流水线)
Flutter wechat sharing
闭包的常见问题
Reorganize common shell scripts for operation and maintenance frontline work
Your brain is learning automatically when you sleep! Here comes the first human experimental evidence: accelerate playback 1-4 times, and the effect of deep sleep stage is the best
C语言学习-Day_05
2021 CSP J2 entry group csp-s2 improvement group round 2 video and question solution
R语言使用econocharts包创建微观经济或宏观经济图、demand函数可视化需求曲线(demand curve)、自定义配置demand函数的参数丰富可视化效果
Une compréhension facile de la simplicité de la classification bayésienne du lissage laplacien
以后发现漏洞,禁止告诉中国!
Future & CompletionService
通俗易懂理解樸素貝葉斯分類的拉普拉斯平滑
实验笔记之——CARMEN (.log .clf)文件转换为rosbag
Frequently asked questions about closures
通俗易懂理解朴素贝叶斯分类的拉普拉斯平滑
Test how students participate in codereview
Use aspese Cells convert Excel to PDF