当前位置:网站首页>Thread safety and its implementation

Thread safety and its implementation

2022-06-24 06:41:00 Gu Dong

Thread safety

Definition of thread safety

When multiple threads access an object at the same time , If you don 't have to consider the scheduling and alternate execution of these threads in the runtime environment , No additional synchronization is required , Or any other coordination operation at the caller , The behavior of calling this object can get the correct result , This object is called thread safe .

Classification of thread safety

  • immutable

    As long as an immutable object is properly constructed ( It didn't happen this Quote escape ), The external visible state will never change , You will never see it in an inconsistent state among multiple threads . For a basic data type , Use... When defining final Keyword modification ensures that it is immutable .

  • Absolute thread safety

    No matter what the runtime environment is , The caller doesn't need any extra synchronization .

  • Relative thread safety

    Thread safety in the usual sense , It needs to ensure that a single operation of this object is thread safe , There is no need to take additional safeguard measures when invoking , But for some specific sequential calls , Additional synchronization measures need to be used on the caller side to ensure the correctness of the call .

  • Thread compatibility

    It means that the object itself is not thread safe , But it can be used safely in the concurrent environment by using synchronization measures correctly on the caller side .

  • Thread opposition

    It means whether or not the calling end adopts synchronization measures , You can't use code concurrently in a multithreaded environment .

Add :this Quote escape

Escape analysis is aimed at memory escape , When an object is defined in the method body , Quoted elsewhere in some way , May lead to GC when , Cannot recycle immediately , Thus causing memory escape . If referenced by an external method , For example, it is passed to other methods as call parameters , This is called method escape ; If accessed by an external thread , For example, assigning values to instance variables that can be accessed in other threads , This is called thread escape .

What is? this Quote escape

Before the constructor is finished ( Before instance initialization ), Put oneself this References are thrown out and copied by other threads , It may cause other threads to access " Half of the objects are initialized "( That is, objects that have not yet been initialized ), Will have an impact .

Escape scene

Scene one

A new thread is started in the constructor ( The new thread has this quote )

import java.text.SimpleDateFormat;
import java.util.Date;

public class EscapeForThis {
    

    int a;
    int b = 0;

    public EscapeForThis() {
    
        a = 1;
        //  Create a new thread in the constructor ( Have this quote ), Access member variables 
        new Thread(new Runnable() {
    

            @Override
            public void run() {
    
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + a + ",b=" + b);
            }
        }).run();
        b = 1;
    }

    public static void main(String[] args) {
    
        EscapeForThis s = new EscapeForThis();
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + s.a + ",b=" + s.b);
    }
}

Execution results s

[12:04:55--new] a=1,b=0
[12:04:55--main] a=1,b=1

The new thread is accessing b When b Initialization has not been completed , Not accessing the correct data .

Scene two

The inner class uses the outer class in the constructor : Inner classes can unconditionally access outer classes ( Automatically hold the external class this quote ), When the inner class is published , That is, the external class this References have also been published , There is no guarantee that the external class has been initialized .

External class EscapeForThis, Inner class EventListener

/** *  Event listener interface , Call the event handler  */
public interface EventListener {
    
    /** *  Event handling method  */
    void doEvent(Object obj);
}

demo

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class EscapeForThis {
    

  	private final int a;
    private final String b;

    private EscapeForThis(EventSource<EventListener> source) throws InterruptedException {
    
        a = 1;
        source.registerListener(o -> System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + a + ",b=" + EscapeForThis.this.b));
        //  For demonstration 
        TimeUnit.SECONDS.sleep(2);
        b = "xx";
    }

    static class EventSource<T> {
    
        private final List<T> eventListeners;

        public EventSource() {
    
            eventListeners = new ArrayList<T>();
        }

        /**  Register listener */
        public synchronized void registerListener(T eventListener) {
      // The array holds references to the incoming object 
            this.eventListeners.add(eventListener);
            this.notifyAll();
        }

        /**  Get event listeners */
        public synchronized List<T> retrieveListeners() throws InterruptedException {
      // Get the array that holds the object reference 
            List<T> dest = null;
            if (eventListeners.size() <= 0) {
    
                this.wait();
            }
            dest = new ArrayList<T>(eventListeners.size());  // Why create a new array here , Where are the benefits ? Prevent objects from escaping , At the same time, reduce the amount of eventListeners competition 
            dest.addAll(eventListeners);
            return dest;
        }
    }

    /** Create a new execution thread */
    static class ListenerRunnable implements Runnable {
    
        private final EventSource<EventListener> source;

        public ListenerRunnable(EventSource<EventListener> source) {
    
            this.source = source;
        }

        @Override
        public void run() {
    
            List<EventListener> listeners = null;

            try {
    
                //  Get event listeners 
                listeners = this.source.retrieveListeners();
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            assert listeners != null;
            for (EventListener listener : listeners) {
    
                listener.doEvent(new Object());  // Execute the method of the inner class to get the member variables of the outer class 
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
    
        EventSource<EventListener> s = new EventSource<>();
        ListenerRunnable runnable = new ListenerRunnable(s);
        Thread thread = new Thread(runnable);
        thread.start();
        EscapeForThis escapeForThis = new EscapeForThis(s);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + escapeForThis.a + ",b=" + escapeForThis.b);
    }

}

Execution results

[15:34:39--Thread-0] a=1,b=null
[15:34:41--main] a=1,b=xx

When initializing a member variable in an external class Dormant 2s, To highlight this Quote the effect of escape .

formation this Quote the condition of escape

  • One is to create an inner class in the constructor , And the inner class is published in the constructor . It can be avoided by breaking external conditions this Quote escape .

Solution

  1. Do not start the thread in the constructor , You can start a thread externally with a single method .

    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    public class EscapeForThis {
          
    
        int a;
        int b = 0;
    
    	private Thread t;
        public EscapeForThis() {
          
            a = 1;
            //  Create a new thread in the constructor ( Have this quote ), Access member variables 
            t= new Thread(new Runnable() {
          
    
                @Override
                public void run() {
          
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + a + ",b=" + b);
                }
            });
            b = 1;
        }
        
        public void initStart() {
          
            t.start();
        }
        
        public static void main(String[] args) {
          
            EscapeForThis s = new EscapeForThis();
            s.initStart();
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + s.a + ",b=" + s.b);
        }
    }
    
  2. Do not wait until the external class initialization is complete , Publish internal classes .

    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class EscapeForThis {
          
        private final int a;
        private final String b;
        private final EventListener listener;
    
        private EscapeForThis(){
          
            a = 1;
            listener = new EventListener(){
          
                @Override
                public void doEvent(Object obj) {
          
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + a + ",b=" + EscapeForThis.this.b);
                }
            };
            b = "xx";
        }
    
        public static EscapeForThis getInstance(EventSource<EventListener> source) {
          
            EscapeForThis safe = new EscapeForThis();  // Initialize first 
            source.registerListener(safe.listener);  // Publish internal classes 
            return safe;
        }
    
        static class EventSource<T> {
          
            private final List<T> eventListeners;
    
            public EventSource() {
          
                eventListeners = new ArrayList<T>();
            }
    
            /**  Register listener */
            public synchronized void registerListener(T eventListener) {
            // The array holds references to the incoming object 
                this.eventListeners.add(eventListener);
                this.notifyAll();
            }
    
            /**  Get event listeners */
            public synchronized List<T> retrieveListeners() throws InterruptedException {
            // Get the array that holds the object reference 
                List<T> dest = null;
                if (eventListeners.size() <= 0) {
          
                    this.wait();
                }
                dest = new ArrayList<T>(eventListeners.size());  // Why create a new array here , Where are the benefits ? Prevent objects from escaping , At the same time, reduce the amount of eventListeners competition 
                dest.addAll(eventListeners);
                return dest;
            }
        }
    
        /** Create a new execution thread */
        static class ListenerRunnable implements Runnable {
          
            private final EventSource<EventListener> source;
    
            public ListenerRunnable(EventSource<EventListener> source) {
          
                this.source = source;
            }
    
            @Override
            public void run() {
          
                List<EventListener> listeners = null;
    
                try {
          
                    //  Get event listeners 
                    listeners = this.source.retrieveListeners();
                } catch (InterruptedException e) {
          
                    e.printStackTrace();
                }
                assert listeners != null;
                for (EventListener listener : listeners) {
          
                    listener.doEvent(new Object());  // Execute the method of the inner class to get the member variables of the outer class 
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
          
            EventSource<EventListener> s = new EventSource<>();
            ListenerRunnable runnable = new ListenerRunnable(s);
            Thread thread = new Thread(runnable);
            thread.start();
            EscapeForThis escapeForThis = EscapeForThis.getInstance(s);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] a=" + escapeForThis.a + ",b=" + escapeForThis.b);
        }
    
    }
    

Thread safe implementation

Mutually exclusive synchronization ( Blocking synchronization )

Synchronization measures with pessimistic concurrency strategy , Whether there is competition or not , Follow the lock operation .

Synchronization is when multiple threads access data concurrently , Ensure that shared data is used by only one thread at the same time . Mutual exclusion is a means to achieve synchronization , A critical region 、 The mutex 、 Semaphores are common mutually exclusive implementations .

Synchronization means synchronized keyword 、Lock Interface .

Thread blocking and wakeup require additional performance overhead .

Implementation examples

public class Monitor {
    

    private int a = 0;

    public synchronized void writer(String s) {
         // 1
        System.out.println(s);                      // 2
        a++;                                        // 3
    }                                               // 4

    public synchronized void reader(String s) {
        // 5
        System.out.println(s);                     // 6
        int i = a;                                 // 7
        System.out.println(i);                     // 8
    }                                              // 9
}
public class Client {
    

    public static void main(String[] args) {
    
        Monitor monitor = new Monitor();

        new Thread(() -> {
    
            monitor.reader("Thread 0");
        }).start();

        new Thread(() -> {
    
            monitor.writer("Thread 1");
        }).start();

        new Thread(() -> {
    
            monitor.reader("Thread 2");
        }).start();
    }
}

Nonblocking synchronization

Synchronization measures with optimistic concurrency strategy , Do it first , In case of conflict, perform compensation operation .

Hardware instruction set dependent , Common instructions :

  • Test and set up (Test-and-Set)
  • Get and add (Fetch-and-Increment)
  • In exchange for (Swap)
  • Compare and exchange (Compare-and-Swap), Common CAS
  • Load connection / Conditional storage (Load-Linked/Store-Conditional)

stay JDK Use in Unsafe class CAS.

Implementation examples

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicTest {
    

    private static final int THREAD_COUNT = 20;

    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(THREAD_COUNT);

    public static final AtomicInteger RACE = new AtomicInteger(0);

    private static final ThreadPoolExecutor POOL_EXECUTOR = initThreadPool(THREAD_COUNT, THREAD_COUNT, 1000);

    /** *  The worker thread  */
    public static class WorkerThreadFactory implements ThreadFactory {
    
        private final String namePrefix;
        private final AtomicInteger nextId = new AtomicInteger(1);

        WorkerThreadFactory(String whatFeatureOfGroup) {
    
            this.namePrefix = "From WorkerThreadFactory's " + whatFeatureOfGroup + "-Worker-";
        }

        @Override
        public Thread newThread(Runnable task) {
    
            String name = namePrefix + nextId.getAndIncrement();
            return new Thread(null, task, name, 0);
        }
    }

    /** *  Initializes the thread pool  */
    public static ThreadPoolExecutor initThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime) {
    
        return new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1000),
                new WorkerThreadFactory("AtomicTest"),
                new ThreadPoolExecutor.AbortPolicy());
    }

    public static void increase() {
    
        // incrementAndGet For internal use sun.misc.Unsafe Of compareAndSwapInt Realized 
        RACE.incrementAndGet();
    }

    public static void main(String[] args) throws InterruptedException {
    

        for (int i = 0; i < THREAD_COUNT; i++) {
    
            int finalI = i;
            POOL_EXECUTOR.execute(() -> {
    
                try {
    
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + finalI + "start ...");
                    for (int j = 0; j < 20000; j++) {
    
                        increase();
                    }
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + finalI + "end ...");
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                } finally {
    
                    COUNT_DOWN_LATCH.countDown();
                }
            });
        }
        if (COUNT_DOWN_LATCH.await(2, TimeUnit.MINUTES)) {
    
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] RACE = " + RACE);
        }

        POOL_EXECUTOR.shutdown();
    }
}

Execution results

[17:28:12:012--From WorkerThreadFactory's AtomicTest-Worker-18] 17start ...
[17:28:12:012--From WorkerThreadFactory's AtomicTest-Worker-2] 1start ...
...
[17:28:12:012--From WorkerThreadFactory's AtomicTest-Worker-2] 1end ....
[17:28:12:012--From WorkerThreadFactory's AtomicTest-Worker-18] 17end ...
...
[17:28:12:012--main] RACE=400000

No synchronization scheme

If a method does not involve sharing data , There is no need for synchronization measures to ensure its correctness .

Reentrant code , It can be interrupted at any time during code execution , Instead, execute the other end of the code ( Include recursion ), And after the return of control , The original program won't make any mistakes , It doesn't affect the result .

Thread local storage (Thread Local Storage): If the data needed in a piece of code must be shared with other codes , Let's see if the code sharing data is guaranteed to execute in the same thread . If you can guarantee , We can limit the visible range of shared data to the same thread , It can ensure that there is no data contention between threads without synchronization . Implementation class ThreadLocal.


Reference article

《 In depth understanding of JAVA virtual machine 》 The third edition Zhou Zhiming Edition

原网站

版权声明
本文为[Gu Dong]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/175/202206240009277748.html