Java 多线程

创建线程

方式一:继承Thread

  • 自定义一个类继承Thread类
  • 重写Thread的run方法,把自定义线程的任务代码定义在run方法上
  • 创建Thread子类的对象,并且调用start方法启动一个线程
public class MyThread extends Thread {
	public void run() {
		for (int i = 0; i < 100; i++)
			System.out.println(getName() + i);
	}
}

// test
    		MyThread my1=new MyThread();//设置线程名
		MyThread my2=new MyThread();
		my1.setName("线程1 ");
		my2.setName("线程2 ");
		my1.start();
		my2.start();

方式二: 实现Runnable接口。

Thread类使用静态代理实现,Thread构造函数接收一个实现Runnable接口的类作为代理类
Thread类本身继承自Runnable接口

  • 自定义一个类实现Runnable接口。
  • 实现Runnable接口中的run方法,把自定义线程的任务代码定义在run方法上
  • 创建Runable实现类 的对象
  • 创建Thread对象,并且把Runnable实现类的对象作为参数传递
  • 调用Thread对象的start方法开启线程。
public class DeamonDemo implements Runnable{

	@Override
	public void run() {
		for(int i = 0;i<100;i++)
			System.out.println(Thread.currentThread().getName()+"---"+i);
	}

}

// test
public class DeamonTest {
	public static void main(String[] args) {
		DeamonDemo d = new DeamonDemo();
		
		Thread d1 = new Thread(d);
		Thread d2 = new Thread(d);

		d1.start();
		d2.start();

	}
}

方式三:实现Callable接口

Callable是线程池的方式创建线程,可以获取到执行函数的返回值,还可以在执行时抛出异常

  • 实现Callable接口,Callable是一个泛型接口,指定call函数返回值类型
  • 实例化实现了Callable接口类
  • 创建线程池Executors.newFixedThreadPool(3),指定线程池大小
  • 提交执行,executor.submit(testCallable1),传递实现了Callable接口类
  • 获取执行结果,executor.submit提交后可获取执行结果
  • 关闭线程池,executorService.shutdown();
public class TestCallable<T> implements Callable<T> {

    @Override
    public T call() throws Exception {
        for (int cnt = 0; cnt < 100;cnt++) {
            System.out.println(Thread.currentThread().getName()+" :cnt = " + cnt);
        }
        return null;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        for (int cnt = 0; cnt < 100;cnt++) {
            System.out.println(Thread.currentThread().getName()+" :cnt = " + cnt);
        }

        // 创建线程类
        TestCallable<Object> testCallable1 = new TestCallable<>();

        // 创建线程池,初始化线程池大小
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 提交执行
        Future<Object> future = executorService.submit(testCallable1);

        // 获取执行结果
        Object result = future.get();

        // 服务关闭
        executorService.shutdown();
    }
}

守护线程

线程分为用户线程守护线程,JVM中只有当所有用户线程结束,守护线程才会结束,守护线程的作用就是为其他线程提供服务,GC就是一个守护线程
用户线程 :是高优先级线程。JVM 会在终止之前等待任何用户线程完成其任务。
守护线程 :是低优先级线程。其唯一作用是为用户线程提供服务。

  • 设置守护线程,一旦主线程结束就结束守护线程
  • 主线程是用户线程
  • 守护线程中产生的线程也是守护线程
                DeamonDemo d = new DeamonDemo();
		
		Thread d1 = new Thread(d);
		Thread d2 = new Thread(d);
				
		d1.setDaemon(true);  //  设置守护线程
		d2.setDaemon(true);
		
		d1.start();
		d2.start();

		for(int i = 0;i<10;i++){
			//打印main线程(主线程)线程名
			System.out.println(Thread.currentThread().getName()+"---"+i);
		}

设置线程优先级

  • IllegalArgumentException
  • void setPriority()
  • int getPriority():优先级范围1-10 默认5
		PriorityDemo p = new PriorityDemo();

		Thread tp1 = new Thread(p);
		Thread tp2 = new Thread(p);
		Thread tp3 = new Thread(p);

		tp1.setName("xyg");
		tp2.setName("wdf");
		tp3.setName("OoO");
		
		tp1.setPriority(10);  //  最高优先级
		tp2.setPriority(1);
		tp3.setPriority(1);
		
		tp1.start();
		tp2.start();
		tp3.start();

线程状态

  • 线程通过thread.getState()方法获取状态

线程的六种状态

  • NEW:未启动状态
  • RUNNABLE:执行中
  • BLOCKED:等待线程锁
  • WAITING:线程等待
  • TIMED_WAITING:线程睡眠
  • TERMINATED:完成执行
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        thread.start();

        while (true) {
            if (thread.getState() == Thread.State.TERMINATED) {
                break;
            }
            System.out.println("thread = " + thread.getState());
        }
    }

...
thread = TIMED_WAITING
thread = TIMED_WAITING
thread = RUNNABLE
thread = RUNNABLE
thread = TERMINATED

线程加入 join

  • 一旦有join()线程,其他线程必须等待
		JoinDemo p = new JoinDemo();

		Thread tp1 = new Thread(p);
		Thread tp2 = new Thread(p);
		Thread tp3 = new Thread(p);

		tp1.setName("xyg");
		tp2.setName("fuck");
		tp3.setName("wdnmd");

		tp1.setPriority(10);
		tp2.setPriority(1);
		tp3.setPriority(1);

		tp1.start();
		try {
			tp1.join();  //  其他线程等待该线程终止
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		tp2.start();
		tp3.start();

线程暂停

  • public static void yield()----Thread:暂停当前正在执行的线程,并执行其他线程
  • 使用yield()的目的是让相同优先级的线程之间能适当的轮转执行。但是,实际中无法保证yield()达到让步目的
  • 因为让步的线程还有可能被线程调度程序再次选中。
	public void run() {
		for(int i = 0;i<100;i++){
			System.out.println(Thread.currentThread().getName()+"---"+i);
			Thread.yield();  //执行其他线程
		}
	}

线程安全

多个线程操作同一个数据,解决:添加锁

同步代码块

  • 同步代码块的锁可以是任意的对象。 同步函数的锁是固定 的,非静态函数的锁对象是this对象。 静态函数的锁对象是class对象。
  • 锁对象必须是多线程共享的对象,否则锁不住
  • 在同步代码块或者是同步函数中调用sleep方法是不会释放锁对象的,如果是调用了wait方法是会释放锁对象的。
        ...
	public void run() {
			if (x%2==0) {
				//同步代码块
				synchronized (this) {// 多个线程使用同一个锁对象
					if (ticket > 0) {
						try {
							Thread.sleep(100);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println(Thread.currentThread().getName() + "正在出售第" + (ticket--) + "张票。");
					}
				}
			} 

方法锁

  • 同步代码块的锁可以是任意的对象。 同步函数的锁是固定 的,非静态函数的锁对象是this对象。 静态函数的锁对象是class对象。

	public void run() {
            check();
        }
        ...
	//同步方法
	//同步方法的锁对象是this对象
	//静态同步方法的锁对象是 类名.class Class类型对象
	private synchronized void check() {
			if (ticket > 0) {
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName() + "正在出售第" + (ticket--) + "张票。");
			}
	}

Arraylist线程安全

  • 使用了同步代码块锁住了arraylist的添加
  • sleep为了解决主线程过早结束
public static void main(String[] args) {
        ArrayList<Object> objects = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread thread = new Thread(() -> {
                synchronized (objects){
                    objects.add(Thread.currentThread().getName());
                }
            });
            thread.start();
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("objects = " + objects.size());
    }

Juc的线程安全集合

public class JucList {
    public static void main(String[] args) {
        CopyOnWriteArrayList<Object> objects = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 1000; i++) {
            Thread thread = new Thread(() -> {
                    objects.add(Thread.currentThread().getName());
            });
            thread.start();
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("objects = " + objects.size());
    }
}

ReentrantLock

  • ReentrantLock()默认构造非公平锁,构造函数传递true实例化为公平锁

公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序, 非公平锁则允许线程“插队”

  • ReentrantLock.lock():加锁
  • ReentrantLock.unLock():解锁
public class ReentrantLockTest {
    public static void main(String[] args) throws InterruptedException {

        Ticket t1 = new Ticket();

        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t1);

        thread1.start();
        thread2.start();

    }

}

class Ticket implements Runnable {
    Integer ticket = 100;

    /**
     * 公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序
     * 非公平锁则允许线程“插队”
     */
    private Lock reenlock = new ReentrantLock(); //初始为非公平锁
    //private Lock reenlock = new ReentrantLock(true); // 初始化为公平锁

    @Override
    public void run() {
        while (true) {
            // 锁机制
            reenlock.lock();
            try {
                if (ticket > 0) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "正在出售第" + (ticket--) + "张票。");
                } else {
                    break;
                }
            } finally {
                // 解锁
                reenlock.unlock();
            }
        }
    }

}

synchronized 和 ReentrantLock

  • 可重入锁是指同一个线程可以多次获取同一把锁,synchronized 和 ReentrantLock都是可重入锁
  • 是否可以响应中断,synchronized 过程中不可响应中断,ReentrantLock提供了中断功能
  • synchronized是非公平锁,ReentrantLock可通过构造方法指定公平锁和非公平锁,默认构造为非公平锁

死锁

线程死锁是指两个或两个以上的线程互相持有对方所需要的资源,由于synchronized的特性,一个线程持有一个资源,或者说获得一个锁,在该线程释放这个锁之前,其它线程是获取不到这个锁的,而且会一直死等下去,因此这便造成了死锁

public class DeadLock {
    private Object lock1 = new Object();
    private Object lock2 = new Object();

    public void thread1Method() {
        synchronized (lock1) {
            System.out.println(Thread.currentThread().getName()+":获取到lock1,请求lock2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lock2) {
                System.out.println(Thread.currentThread().getName()+":获取到lock2");
            }
        }
    }

    public void thread2Method() {
        synchronized (lock2) {
            System.out.println(Thread.currentThread().getName()+":获取到lock2,请求lock1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lock1) {
                System.out.println(Thread.currentThread().getName()+":获取到lock1");
            }
        }
    }

    public static void main(String[] args) {
        DeadLock deadLock = new DeadLock();
        new Thread(()->{
            deadLock.thread1Method();
        }).start();

        new Thread(()->{
            deadLock.thread2Method();
        }).start();
    }
}

线程通信

  • wait() 让当前线程释放对象锁并进入等待(阻塞)状态
  • notify() :唤醒一个正在等待相应对象锁的线程,使其进入就绪队列
  • notifyAll():唤醒所有正在等待相应对象锁的线程,使它们进入就绪队列

wait()和sleep()

  • wait()是Object类方法,释放锁对象
  • sleep()是Thread类的的静态方法,不释放锁对象
	public synchronized void set(String name, int age) {
		//如果有数据则等待
		if (flag) {
			try {
				wait();  //  线程等待
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		//设置值
		this.name=name;
		this.age=age;

		// 修改标记
		flag = true;
		notify();// 线程唤醒
	}
    
        ...

	public synchronized void get(){
		//如果没有数据就等待
		if(!flag){
			try {
				wait();  // 线程等待
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		System.out.println(name+" "+age);
		
		//修改标记
		flag=false;
		notify();  // 线程唤醒
	}

生产者消费者问题

/**
 * 生产者消费者问题
 * 生产者不停生产,消费者不停消费。。
        1 products ware produced.
        1 product ware Consumed.
        2 products ware produced.
        3 products ware produced.
        4 products ware produced.
        2 product ware Consumed.
        5 products ware produced.
        3 product ware Consumed.
        6 products ware produced.
        4 product ware Consumed.
        7 products ware produced.
        5 product ware Consumed.
        ...
 */
public class ProducersAndConsumers {
    public static void main(String[] args) {
        Depostory depostory = new Depostory();

        Producers producers = new Producers(depostory);
        Consumers consumers = new Consumers(depostory);

        new Thread(producers).start();
        new Thread(consumers).start();
    }

}

// 产品
class Product{
    int id;

    public Product(int id) {
        this.id = id;
    }
}

// 生产者
class Producers implements Runnable{
    Depostory depostory;

    public Producers(Depostory depostory) {
        this.depostory = depostory;
    }

    @Override
    public void run() {
        for (int i = 1; ; i++) {
            depostory.push(new Product(i));
            System.out.println(i+" products"+" ware produced.");
        }
    }
}

// 消费者
class Consumers implements Runnable{

    Depostory depostory;

    public Consumers(Depostory depostory) {
        this.depostory = depostory;
    }


    @Override
    public void run() {
        while (true){
            Product pop = depostory.pop();
            System.out.println(pop.id+" product "+ "ware Consumed.");
        }
    }
}

// 仓库
class Depostory{
    List<Product> products = new ArrayList<>();
    private int size = 5; // 仓库容量

    // 生产者生产产品
    public synchronized void push(Product product){
        if (size == products.size()) {
            // 等待消费者消费
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        products.add(product);
        notifyAll(); //通知消费者消费
    }

    // 消费者消费产品
    public synchronized Product pop(){
        if (products.size() == 0) {
            try {
                wait(); // 线程等待,等待生产者生产
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Product product = products.get(0);
        products.remove(product);
        notifyAll(); // 线程唤醒,通知
        return product;
    }
}

通过事件通知线程

/**
 * 线程2等待线程2运行n秒后再运行,使用标记事件的方式
 */
public class ThreadEventDemo {
    public static void main(String[] args) {
        Event event = new Event();

        Thread1 thread1 = new Thread1(event);
        Thread2 thread2 = new Thread2(event);

        new Thread(thread1).start();
        new Thread(thread2).start();
    }
}

// Thread1
class Thread1 implements Runnable {
    Event evnet;
    int cnt = 0;

    public Thread1(Event evnet) {
        this.evnet = evnet;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            cnt++;
            if (cnt == 2) {
                evnet.flag = true; // 设置事件
            }
            System.out.println("thread1 run for " + cnt + "seconds.");

        }
    }
}

// Thread2
class Thread2 implements Runnable {
    Event evnet;

    public Thread2(Event evnet) {
        this.evnet = evnet;
    }

    @Override
    public void run() {
        while (true) {
            // 等待事件激活
            if (evnet.flag) {
                System.out.println("thread2 running..");
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// Event
class Event {
    boolean flag = false; // 事件默认未激活
}

管道通信

  • PipedOutputStream:可以向管道中写入数据,
  • PipedIntputStream:可以读取PipedOutputStream向管道中写入的数据,这两个类主要用来完成线程之间的通信

PipedInputStream中,有一个buffer字节数组,默认大小为1024,作为缓冲区

  1. 构造PipedInputStream、PipedOutputStream
  2. 使用connect方法连接对方
  3. PipedOutputStream.write(bytes)方法向通道写入字节数组
  4. length = PipedIntputStream.read(bytes)方法向通道读取数组,当返回length为-1时读取完全部
/**
 * 线程间,管道通信
 */
public class PipedDemo {
    public static void main(String[] args) throws IOException, InterruptedException {
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream();

        //pipedInputStream.connect(pipedOutputStream);  // 效果相同
        pipedOutputStream.connect(pipedInputStream);

        new Thread(new WriteThread(pipedOutputStream)).start();
        Thread.sleep(1000);
        new Thread(new ReadThread(pipedInputStream)).start();
    }
}

// 写线程
class WriteThread implements Runnable {
    PipedOutputStream pipedOutputStream;

    public WriteThread(PipedOutputStream pipedOutputStream) {
        this.pipedOutputStream = pipedOutputStream;
    }

    @Override
    public void run() {
        System.out.println("write:");
        try {
            for (int i = 0; i < 30; i++) {
                System.out.print(i);
                byte[] bytes = Integer.toString(i).getBytes();
                pipedOutputStream.write(bytes);
            }
            pipedOutputStream.close();
            System.out.println();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// 读线程
class ReadThread implements Runnable {
    PipedInputStream pipedInputStream;

    public ReadThread(PipedInputStream pipedInputStream) {
        this.pipedInputStream = pipedInputStream;
    }

    @Override
    public void run() {
        byte[] bytes = new byte[20];
        try {
            int length = pipedInputStream.read(bytes);
            System.out.println("read:");
            // -1 表示读取完了
            while (-1 != length) {
                String s = new String(bytes, 0, length);
                System.out.print(s);
                length = pipedInputStream.read(bytes);
            }
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * output:
 *
     write:
     01234567891011121314151617181920212223242526272829
     read:
     01234567891011121314151617181920212223242526272829
 *
 */
原文地址:https://www.cnblogs.com/xiongyungang/p/12495740.html