java并发实战-基础篇

一. 线程安全

1.1 什么是线程安全?

多线程访问时,程序运行正确,同时与线程调度和交叉执行顺序无关,调用方无须同步和协调。

A class is thread safe if it behaves correctly when accessed from multiple threads, regardless of the scheduling or interleaving of the execution of those threads by the runtime environment, and with no additional synchronization or other coordination on the part of the calling code.

线程安全的核心在于访问状态,特别是访问共享的、可变的状态(shared,mutable state)。

什么是对象状态?对象的数据,如存储在实例中的属性或静态属性,同时也包含依赖的对象的状态,例如HashMap的状态存储在HashMap对象本身中,同时也存储在 Map.Entry中。

如果对象没有状态呢?例如StatelessFactorizer,StatelessFactorizer没有属性,也没有引用。无状态的对象是线程安全的。

@ThreadSafe
public class StatelessFactorizer implements Servlet {
    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = factor(i);
        encodeIntoResponse(resp, factors);
  } 
}

1.2 原子性(Atomicity)

假如对上面的无状态StatelessFactorizer 增加访问次数记录,添加一个共享的、可变的状态(shared,mutable state) count:

@NotThreadSafe
public class UnsafeCountingFactorizer implements Servlet {
    private long count = 0;
    public long getCount() { return count; }
    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = factor(i);
        ++count;
        encodeIntoResponse(resp, factors);
    }
}

出现了Race condition (争用条件) 访问次数记录count在增加时++count实际是Read-modify-write过程, Read-modify-write是一种典型的Race condition (争用条件)。

( A race condition occurs when the correctness of a computation depends on the relative timing or interleaving of multiple threads by the runtime; in other words, when getting the right answer relies on lucky timing )

为保证程序正确,需要保证Read-modify-write原子性,将counter由long类型替换成AtomicLong类型,例如CountingFactorizer:

@ThreadSafe
public class CountingFactorizer implements Servlet {
    private final AtomicLong count = new AtomicLong(0);
    
    public long getCount() { return count.get(); }
    
    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = factor(i);
        count.incrementAndGet();
        encodeIntoResponse(resp, factors);
    }
}

1.3 锁

假如现在希望提高性能,添加缓存功能,缓存最近计算结果lastNumber和lastFactors,这时如果使用两个Atomicxxx来解决呢?显然是不行的,因为这里需要一个不变式(invariant)约束lastNumber和lastFactors同时更新,二者保持一致。最简单的方案是使用synchronized内置锁,lastNumber和lastFactors用同一个锁来保证线程安全。

@ThreadSafe
public class SynchronizedFactorizer implements Servlet {
    @GuardedBy("this") private BigInteger lastNumber;
    @GuardedBy("this") private BigInteger[] lastFactors;
    
    public synchronized void service(ServletRequest req,
                                     ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        if (i.equals(lastNumber))
            encodeIntoResponse(resp, lastFactors);
        else {
            BigInteger[] factors = factor(i);
            lastNumber = i;
            lastFactors = factors;
            encodeIntoResponse(resp, factors);
        } 
    }
}

1.4 活跃与性能(Liveness and Performance )

UnsafeCachingFactorizer能够正确运行,已经满足线程安全的要求,但重新分析一下需求,我们本意是想通过缓存来提升性能,目前的同步策略使多个请求以串行的方式执行,如果某一部分比如factor(i)需要运行较长时间,会使整体性能变得较差,同时factor(i)不涉及共享的、可变的状态(shared,mutable state)的访问,比如因此我们尝试将去分解同步块,例如CachedFactorizer:

@ThreadSafe
public class CachedFactorizer implements Servlet {
    @GuardedBy("this") private BigInteger lastNumber;
    @GuardedBy("this") private BigInteger[] lastFactors;
    
    @GuardedBy("this") private long hits;
    @GuardedBy("this") private long cacheHits;
    
    public synchronized long getHits() { return hits; }
    
    public synchronized double getCacheHitRatio() {
        return (double) cacheHits / (double) hits;
    }
    
    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = null;
        synchronized (this) {
            ++hits;
            if (i.equals(lastNumber)) {
                ++cacheHits;
                factors = lastFactors.clone();
            }
        }
        if (factors == null) {
            factors = factor(i);
            synchronized (this)  {
                lastNumber = i;
                lastFactors = factors.clone();
            }
        }
        encodeIntoResponse(resp, factors);
    }
}

调整同步块大小涉及到安全、简洁(同步整个方法)和并发(尽可能同步较少代码)之间的权衡,获取锁和释放锁本身也有一定的开销,拆分的太小(比如++hits 用一个独立的同步块)也不是很好的方式,同时降低了程序的可读性,总的来说,将运行时间较长(比如网络和IO操作)且与不涉及共享的、可变的状态(shared,mutable state)的操作拆分出来。

从以下几个方面分析同步需求:

  • a. 确定对象状态;
  • b. 确定不变式(invariant):例如SynchronizedFactorizer中的lastNumber和lastFactors;
  • c. 建立并发访问对象状态的同步策略;
  • d. 保证线程安全的前提下,提高灵活性和性能;

二. 共享和发布对象

上一部分我们主要讲线程安全地访问对象的共享的、可变的状态(shared,mutable state),这一部分主要介绍一些共享和发布对象的技术,使这些对象能够被多线程安全地使用。

2.1 可见性

在多线程程序中,没有做正确的同步是无法保证数据可见性的,如 NoVisibility:

public class NoVisibility {
    private static boolean ready;
    private static int number;
    
    private static class ReaderThread extends Thread {
        public void run() {
            while (!ready)
                Thread.yield();
            System.out.println(number);
        }
    }
    public static void main(String[] args) {
        new ReaderThread().start();
        number = 42;
        ready = true;
  } 
}

结果是 42, or 0, 还是什么都不打印? 都有可能

Volatile 关键字

用Volatile修饰的变量可以简单看作SynchronizedInteger

@ThreadSafe
public class SynchronizedInteger {
    @GuardedBy("this") private int value;
    
    public synchronized int get() { return value; }
    public synchronized void set(int value) { this.value = value; }
}

注:锁不仅保证原子性,也保证可见性,volatile变量仅保证可见。

内置锁synchronized和java.util.concurrent.locks.Lock实现内存可见性地方式区别?

synchronized中可见性由JSR133中**Happens-Before保证;

java.util.concurrent.locks.Lock通过volatile保证;

2.2 发布(Publication) 和 逃逸(Escape)

在初始化一个共享对象期间,对象只能由创建此对象的线程访问。一旦初始化完成,对象就可以安全的发布(publication)了(也就是说,可以对其他线程可见)。Java memory model (JMM) 允许在对象初始化开始后、结束前访问对象,因此,程序需要防止将一个未完成初始化的对象发布(publication)。错误的发布一个对象叫做逃逸(escape)

2.2.1 逃逸

逃逸案例一

ThisEscape展示了一种典型的逃逸案例,在初始化期间对象的引用逃逸了。ThisEscape发布(publication)了EventListener,同时也将ThisEscape的实例也发布了,因为内部类持有外部类的引用。

public class ThisEscape {

  public ThisEscape(EventSource source) {
      source.registerListener(
        new EventListener() {
          public void onEvent(Event e) {
            doSomething(e);
          } 
      });
  }
}

SafeListener使用工厂方法来防止在对象构造期间引用逃逸。

public class SafeListener {
  private final EventListener listener;
  
  private SafeListener() {
    listener = new EventListener() {
      public void onEvent(Event e) {
        doSomething(e);
      } 
    };
  }
  
  public static SafeListener newInstance(EventSource source) {
    SafeListener safe = new SafeListener();
    source.registerListener(safe.listener);
    return safe;
  } 
}

逃逸案例二

StuffIntoPublic没有正确地发布(publication)Holder,另外一个线程(不是发布(publication))线程调用assertSanity,可能会throw AssertionError!

public class Holder {
    private int n;
    public Holder(int n) { this.n = n; }
    public void assertSanity() {
        if (n != n)
          throw new AssertionError("This statement is false.");
    } 
}
public class StuffIntoPublic {
    // Unsafe publication
    public Holder holder;

    public void initialize() {
        holder = new Holder(42);
    }
}

注:这两段代码中的问题不在于Holder,而是没有正确地发布(publication),但如果Holder是不可变的,即使没有正确发布,也不会出现throw AssertionError的问题。

对throw AssertionError出现的原因我们进一步分析,假设下面两段代码代表两个线程:

线程1:

holder = new Holder(42);

线程2:

holder.assertSanity(); // can throw

线程1中的代码从更底层的角度来看,本质上是一系列内存的写和分配的过程:

  1. 给pointer1分配内存;
  2. 在 pointer1偏移量为0的位置写42;
  3. 将pointer1赋值给holder。

因为Java的弱内存模型,从线程2的角度来看上面的执行过程很可能是下面的顺序:

  1. 给pointer1分配内存;
  2. 将pointer1赋值给holder;
  3. 在 pointer1偏移量为0的位置写42。

也就是说在n在获得值42之前,线程2可能调用assertSanity方法。在assertSanity方法执行期间n被读了两次,一次是在步骤3之前,一次是在步骤3之后,两次值不一样,抛出异常。

(有的说法是新版本的java更新了内存模型,这种情况不会发生,也有说法java8依然可能存在这个问题,待考证,因此程序仍需关注防止将一个未完成初始化的对象发布)

逃逸案例三

class Foo {
  private Helper helper;
 
  public Helper getHelper() {
    return helper;
  }
 
  public void initialize() {
    helper = new Helper(42);
  }
}
 
public class Helper {
  private int n;
 
  public Helper(int n) {
    this.n = n;
  }
  // ...
}

如果一个线程在调用initialize()方法之前调用了getHelper()方法,将会看到一个未初始化的helper属性,如果一个线程调用了initialize()方法,而另一个线程调用了getHelper()方法,后者将可能看到如下三种情形之一:

  • helper为null;

  • 完全初始化好的Helper实例,n的值为42;

  • 未初始化完成的Helper实例,n的值为默认值0.

JMM允许编译器先为Helper实例分配内存,这块内存分配引用并将引用赋值给helper属性,再初始化Helper实例,也就是说,编译器可能会对写helper实例属性和初始化Helper实例(this.n=n)重排序,所以前者先发生了,其他线程将会看到一个未初始化完的Helper实例;同时存在另外一个问题,如果多个线程调用initialize(),将会创建多个Helper实例,不过这仅仅是性能问题,无用的Helper实例将会被垃圾回收器回收。

2.2.2 安全发布(Safe Publication)

对象的引用和对象的状态同时对其他线程可见,才可以看作安全地发布一个对象,一个正确构造的对象可以通过以下方式安全地发布:

https://wiki.sei.cmu.edu/confluence/display/java/TSM03-J.+Do+not+publish+partially+initialized+objects

  • 使用同步
class Foo {
  private Helper helper;
 
  public synchronized Helper getHelper() {
    return helper;
  }
 
  public synchronized void initialize() {
    helper = new Helper(42);
  }
}
  • final
class Foo {
  private final Helper helper;
 
  public Helper getHelper() {
    return helper;
  }
 
  public Foo() {
    // Point 1
    helper = new Helper(42);
    // Point 2
  }
}
  • final+线程安全
class Foo {
  private final Vector<Helper> helper;
 
  public Foo() {
    helper = new Vector<Helper>();
  }
 
  public Helper getHelper() {
    if (helper.isEmpty()) {
      initialize();
    }
    return helper.elementAt(0);
  }
 
  public synchronized void initialize() {
    if (helper.isEmpty()) {
      helper.add(new Helper(42));
    }
  }
}
  • static初始化
// Immutable Foo
final class Foo {
  private static final Helper helper = new Helper(42);
 
  public static Helper getHelper() {
    return helper;
  }
}
  • 不可变对象+volatile
class Foo {
  private volatile Helper helper;
 
  public Helper getHelper() {
    return helper;
  }
 
  public void initialize() {
    helper = new Helper(42);
  }
}
 
// Immutable Helper
public final class Helper {
  private final int n;
 
  public Helper(int n) {
    this.n = n;
  }
  // ...
}
  • 线程安全可变对象+volatile
class Foo {
  private volatile Helper helper;
 
  public Helper getHelper() {
    return helper;
  }
 
  public void initialize() {
    helper = new Helper(42);
  }
}
 
// Mutable but thread-safe Helper
public class Helper {
  private volatile int n;
  private final Object lock = new Object();
 
  public Helper(int n) {
    this.n = n;
 
  }
 
  public void setN(int value) {
    synchronized (lock) {
      n = value;
    }
  }
}

2.3 不变性(Immutability)

从上面代码可以看出,即使另外一个线程能看到对象的引用,但并不意味着能看到对象的状态(state),如果要保证这种一致,需要使用同步。然而不变(Immutable)对象,即使没有使用同步发布(publication)对象的引用,也能够被安全的访问,不可变对象是线程安全的 。

  • Final Fields

尽量用private修饰属性,除非需要更大范围可访问;尽量用final修饰对象属性,除非需要可变;越少的可变属性,越简单。

用final和valotile重新设计第一部分的缓存因数分解代码,如下:

@Immutable
class OneValueCache {
    private final BigInteger lastNumber;
    private final BigInteger[] lastFactors;
    
    public OneValueCache(BigInteger i, BigInteger[] factors) {
        lastNumber  = i;
        lastFactors = Arrays.copyOf(factors, factors.length);
    }
    
    public BigInteger[] getFactors(BigInteger i) {
        if (lastNumber == null || !lastNumber.equals(i))
          return null;
        else
          return Arrays.copyOf(lastFactors, lastFactors.length);
    } 
}
@ThreadSafe
public class VolatileCachedFactorizer implements Servlet {
    private volatile OneValueCache cache = new OneValueCache(null, null);
    
    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = cache.getFactors(i);
        if (factors == null) {
            factors = factor(i);
            cache = new OneValueCache(i, factors);
        }
        encodeIntoResponse(resp, factors);
    }
}

一个对象是不可变的,需要满足如下条件:

  • 对象构造后状态不再被修改(Its state cannot be modified after construction);
  • 对象的所有属性是final的,同时(All its fields are final;[12] and )
  • 对象被正确的构造,也就是引用不会在构造期间逃逸(It is properly constructed (the this reference does not escape during construction).)

不可变对象非常重要,因此JavaMemory Model来保证共享的不可变对象的安全构造,也就是说如果对象的引用被其他线程可见,对象的状态同时对其他线程可见。

2.4 事实不可变(Effectively Immutable )对象

如果对象不是在技术层面上不可变的,而是在发布(publication)后不会再修改,就可以称为事实不可变(effectively immutable),即使对象不符合上面不可变对象的严格条件,仍然可以在发布之后当成不变的(immutable)对象,例如Date是可变的(mutable),但如果用来记录每个用户的登录时间,也可以当作是不变的(immutable)

public Map<String, Date> lastLogin =
    Collections.synchronizedMap(new HashMap<String, Date>());

2.5 不共享(Thread Confinement )

访问共享的、可变的状态(shared,mutable state)需要同步,如果不共享,对象只由一个线程访问,就不需要同步。在java在语言层面没有这种机制,即将某个对象限制在某个线程中使用,只能在代码层面来保证了,不过java提供了语言方面和类库的支持,比如本地变量和ThreadLocal类。

  • JDBC

    Connection对象不要求线程安全,因为连接池同一时间对一个connection只分配给一个线程直到线程还回connection。

  • 栈(Stack Confinement )

a .原生本地变量(primitively typed local variables),例如loadTheArk方法中numPairs

b. 对象引用:确保引用不逃逸(escape),例如loadTheArk方法中animals

public int loadTheArk(Collection<Animal> candidates) {
    SortedSet<Animal> animals;
    int numPairs = 0;
    Animal candidate = null;
    // animals confined to method, don't let them escape!
    animals = new TreeSet<Animal>(new SpeciesGenderComparator());
    animals.addAll(candidates);
    for (Animal a : animals) {
        if (candidate == null || !candidate.isPotentialMate(a))
            candidate = a;
        else {
            ark.load(new AnimalPair(candidate, a));
            ++numPairs;
            candidate = null;
        } 
    }
    return numPairs;
}
  • ThreadLocal关键字
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
    public Connection initialValue() {
            return DriverManager.getConnection(DB_URL);
    } 
};
public static Connection getConnection() {
    return connectionHolder.get();
}

2.6 共享(share)可变的(Mutable) 对象

如果想安全的共享可变(share mutable )对象,需要将对象安全的发布,同时要么对象本身线程安全,要么用锁来保证线程安全。

三. 组建对象(Composing Objects)

前两节主要讲解了线程安全和同步的底层知识,但在程序设计的时候,我们不希望向前两节那样分析每个细节来确保线程安全,这一节主要讲解一些技巧,利用现有的模块去安全地组建出新的模块,新的类。

3.1 组建对象技巧

  • 实例外不可见-封装非线程安全对象(Instance Confinement(Encapsulate not thread-safe objects))

如果一个对象不是线程安全的,例如HashSet,但如果把HashSet的访问限制在PersonSet内部,而PersonSet是线程安全的,那么就可以安全的使用HashSet了。

@ThreadSafe
public class PersonSet {
    @GuardedBy("this")
    private final Set<Person> mySet = new HashSet<Person>();
    
    public synchronized void addPerson(Person p) {
        mySet.add(p);
		}
    public synchronized boolean containsPerson(Person p) {
        return mySet.contains(p);
		} 
}

注:这个例子没有考虑Person是否可变,如果Person是可变的(mutable),需要进一步同步策略。

下面举一个车辆跟踪的例子来介绍其他技巧。

  • 使用同步
@ThreadSafe
public class MonitorVehicleTracker {
    @GuardedBy("this")
    private final Map<String, MutablePoint> locations;        //车辆位置
    
    public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
        this.locations = deepCopy(locations);
    }
    public synchronized Map<String, MutablePoint> getLocations() {
        return deepCopy(locations);
    }
    public synchronized  MutablePoint getLocation(String id) {
        MutablePoint loc = locations.get(id);
        return loc == null ? null : new MutablePoint(loc);
    }
    public synchronized  void setLocation(String id, int x, int y) {
        MutablePoint loc = locations.get(id);
        if (loc == null)
            throw new IllegalArgumentException("No such ID: " + id);
        loc.x = x;
        loc.y = y; 
    }
    private static Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> m) {
        Map<String, MutablePoint> result = new HashMap<String, MutablePoint>();
        for (String id : m.keySet())
            result.put(id, new MutablePoint(m.get(id)));
        return Collections.unmodifiableMap(result);
    }
}
public class MutablePoint { /*  Listing 4.5  */ }
@NotThreadSafe
public class MutablePoint {
    public int x, y;
    public MutablePoint() { x = 0; y = 0; }
    public MutablePoint(MutablePoint p) {
        this.x = p.x;
        this.y = p.y;
    }
}
  • 委托(Delegation)

将线程安全委托给ConcurrentHashMap.

@ThreadSafe
public class DelegatingVehicleTracker {
    private final ConcurrentMap<String, Point> locations;
    private final Map<String, Point> unmodifiableMap;
    
    public DelegatingVehicleTracker(Map<String, Point> points) {
        locations = new ConcurrentHashMap<String, Point>(points);
        unmodifiableMap = Collections.unmodifiableMap(locations);
    }
    public Map<String, Point> getLocations() {
        return unmodifiableMap;
    }
    public Point getLocation(String id) {
        return locations.get(id);
    }
    public void setLocation(String id, int x, int y) {
        if (locations.replace(id, new Point(x, y)) == null)
        throw new IllegalArgumentException( "invalid vehicle name: " + id);
    } 
}
@Immutable
public class Point {
    public final int x, y;
    public Point(int x, int y) {
        this.x = x;
        this.y = y; 
    }
}

注:这个例子中返回的车辆位置视图是变化的,如果希望不变,就需要对位置进行浅拷贝(shallow copy )。

public Map<String, Point> getLocations() {
    return Collections.unmodifiableMap(new HashMap<String, Point>(locations));
}
  • 发布车辆状态(Vehicle Tracker that Publishes Its State)

使用 SafePoint, 我们可以构建车辆,同时使内部的可变状态对外部可见,而不破坏线程安全性。

@ThreadSafe
public class SafePoint {
    @GuardedBy("this") private int x, y;
    private SafePoint(int[] a) { this(a[0], a[1]); }
    public SafePoint(SafePoint p) { this(p.get()); }
    public SafePoint(int x, int y) {
        this.x = x;
        this.y = y; 
    }
    public synchronized int[] get() {
        return new int[] { x, y };
    }
    public synchronized void set(int x, int y) {
        this.x = x;
        this.y = y; }
    }
}
@ThreadSafe
public class PublishingVehicleTracker {
    private final Map<String, SafePoint> locations;
    private final Map<String, SafePoint> unmodifiableMap;
    public PublishingVehicleTracker(Map<String, SafePoint> locations) {
        this.locations = new ConcurrentHashMap<String, SafePoint>(locations);
        this.unmodifiableMap = Collections.unmodifiableMap(this.locations);
    }
    public Map<String, SafePoint> getLocations() {
        return unmodifiableMap;
    }
    public SafePoint getLocation(String id) {
        return locations.get(id);
    }
    public void setLocation(String id, int x, int y) {
        if (!locations.containsKey(id))
            throw new IllegalArgumentException("invalid vehicle name: " + id);
            locations.get(id).set(x, y);
    }
}

3.2 对已有线程安全类增加新功能

比如,我们想增加put-if-absent操作(假设没有线程安全的类提供这一功能),怎么办呢?

  • 改源码:可能没有源码或者无法修改,即使可以修改,需要去理解源码的同步策略(synchronization policy)。
  • 继承现有类:非常直接,同时父类的状态全暴露给子类,这种方式较脆弱,因为同步策略(synchronization policy)分散在不同的源码中
@ThreadSafe
public class BetterVector<E> extends Vector<E> {
    public synchronized boolean putIfAbsent(E x) {
        boolean absent = !contains(x);
        if (absent)
          add(x);
        return absent;
    } 
}
    
  • 调用方锁:
@NotThreadSafe
public class ListHelper<E> {
    public List<E> list =
        Collections.synchronizedList(new ArrayList<E>());
    ...
    public synchronized boolean putIfAbsent(E x) {
      boolean absent = !list.contains(x);
      if (absent)
        list.add(x);
      return absent;
    } 
}

为什么这样不行?使用了错误的锁,使用了ListHelper对象的锁,和list用的锁不一样。

@ThreadSafe
public class ListHelper<E> {
    public List<E> list =
        Collections.synchronizedList(new ArrayList<E>());
    ...
    public boolean putIfAbsent(E x) {
        synchronized (list)  {
            boolean absent = !list.contains(x);
            if (absent)
} }
}
    list.add(x);
return absent;
  • 组合
@ThreadSafe
public class ImprovedList<T> implements List<T> {
    private final List<T> list;
    public ImprovedList(List<T> list) { this.list = list; }
    public synchronized boolean putIfAbsent(T x) {
        boolean contains = list.contains(x);
        if (contains)
            list.add(x);
        return !contains;
}
    public synchronized void clear() { list.clear(); }
    // ... similarly delegate other List methods
}

3.3 同步策略文档化

为使用者说明线程安全保障,为维护者说明同步策略。

(Document a class's thread safety guarantees for its clients; document its synchronization policy for its maintainers.)

Reference

《java concurrency in practice》

http://www.cs.umd.edu/~pugh/java/memoryModel/jsr133.pdf

http://jcip.net/listings.html

https://stackoverflow.com/questions/1621435/not-thread-safe-object-publishing

https://wiki.sei.cmu.edu/confluence/display/java/TSM03-J.+Do+not+publish+partially+initialized+objects

原文地址:https://www.cnblogs.com/withwhom/p/11625225.html