线程

什么是线程

线程(thread)是操作系统能够进行「运算调度」的最小单位,其是进程中的一个执行任务(控制单元),负责当前进程中程序的执行。

一个进程至少有一个线程,一个进程可以运行多个线程,这些线程共享同一块内存,线程之间可以共享对象、资源,如果有冲突或需要协同,还可以随时沟通以解决冲突或保持同步

进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源),线程真正被CPU执行,线程需要的资源跟所在进程的要

为什么要用线程

开设线程的消耗远远小于进程,开一个新的进程需要先申请新的内存空间,而一个进程可以开设多个进程,无需再另外申请空间,并且一个进程内的多个线程数据是共享的

线程与进程区别

  • 本质区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
  • 在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程可以看做轻量级的进程,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小
  • 「所处环境」:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)
  • 内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源
  • 包含关系:没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的;线程是进程的一部分,所以线程也被称为轻权进程或者轻量级进程

举个例子:进程=火车,线程=车厢

  • 线程在进程下行进(单纯的车厢无法运行)
  • 一个进程可以包含多个线程(一辆火车可以有多个车厢)
  • 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
  • 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
  • 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)

注意:进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程。

python创建线程

方式一

# 利用函数
from threading import Thread
import time


def task(name):
    print(f"{name} is running")
    time.sleep(3)
    print(f'{name} is over')


# 创建线程无需在__main__下面编写 但是为了统一 还是习惯在子代码中写
t = Thread(target=task, args=('kevin',))
t.start()  # 创建线程的开销极小 几乎是一瞬间就可以创建
print('主线程')

# kevin is running
# 主线程
# kevin is over

方式二

# 利用类
from threading import Thread
import time


class MyThread(Thread):
    def __init__(self, username):
        super().__init__()
        self.username = username

    def run(self):
        print(f'{self.username} is running')
        time.sleep(3)
        print(f'{self.username} is over')


t = MyThread('kevin')
t.start()d
print('主线程')

# kevin is running
# 主线程
# kevin is over

线程实现TCP并发

服务端

from threading import Thread
import socket

HOST = '127.0.0.1'
PORT = 8080
server = socket.socket()
server.bind((HOST, PORT))
server.listen()


def talk(sock):
    while True:
        data = sock.recv(1024)
        print(data.decode('utf8'))
        sock.send(data.upper())


while True:
    sock, addr = server.accept()
    t = Thread(target=talk, args=(sock,))
    t.start()

客户端

import socket

client = socket.socket()

client.connect(('127.0.0.1', 8080))

while True:
    client.send(b'hello world')
    data = client.recv(1024)
    print(data.decode('utf8'))

线程对象属性和方法

join方法

让主线程等待子线程运行完毕再执行

from threading import Thread
import time


def task(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is over')


t = Thread(target=task, args=('kevin',))
t.start()
t.join()  # 主线程代码等待子线程代码运行完毕之后再往下执行
print('主线程')

# kevin is running
# kevin is over
# 主线程

统计活跃的线程数

active_count()

from threading import Thread,active_count
import os
import time

def task():
    time.sleep(3)
    print('子线程获取进程号>>>:',os.getpid())

if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    print(active_count())
    print('主线程获取进程号>>>:',os.getpid())

获取线程的名字

current_thread().name

from threading import Thread,active_count,current_thread
import os
import time

def task():
    time.sleep(3)
    print('子线程获取进程号>>>:',os.getpid())
    print(current_thread().name)

if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()
    print(current_thread().name)
    print('主线程获取进程号>>>:',os.getpid())

通过类来获取名字

from threading import Thread,active_count,current_thread
import os
import time

class MyThread(Thread):
    def run(self):
        print(self.name)

if __name__ == '__main__':
    t1 = MyThread()
    t2 = MyThread()
    t1.start()
    t2.start()

# Thread-1
# Thread-2

同一个进程内多个线程数据共享

from threading import Thread

money = 999
def task():
    global money
    money = 666

t = Thread(target=task)
t.start()
t.join()
print(money)
# 666

守护进程

from threading import Thread
import time


def task(name):
    print(f'{name} is running')
    time.sleep(3)
    print(f'{name} is over')


t = Thread(target=task, args=('kevin',))
t.daemon = True
t.start()
print('主线程')

# kevin is running
# 主线程

计算密集型与IO密集型程序

计算密集型

​ 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。直言比喻 比如造原子弹 需要大量计算这种

IO密集型

大部分的程序在运行时,都需要大量IO操作,比如网络数据的收发,大文件的读写,这样的程序称为IO密集型程序。

IO密集型程序在运行时,需要大量的时间进行等待,如果IO操作不完成,程序无法执行后面的操作,一直处于等待状态,导致CPU空闲。

由于GIL的存在,同一时刻只能有一个线程执行,在程序进行IO操作时,CPU实际并没有做任何工作,程序执行效率非常低。

为了提高CPU的使用率,Python解释在程序执行IO等待时,会释放GIL锁,让其它线程执行,提高Python程序的执行效率。

所以,GIL对于IO密集型的影响很小,多线程适合用来做IO密集型的程序。

GIL全局解释器锁

官方文档

​ In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

理解

​ GIL只存在于CPython解释器中,不是python的特征,GIL是一把互斥锁,用于阻止同一个进程下的多个线程同时执行。原因是因为CPython解释器中的垃圾回收机制不是线程安全的。

​ 反向验证GIL的存在如果不存在会产生垃圾回收机制与正常线程之间数据错乱,GIL是加在CPython解释器上面的互斥锁,同一个进程下的多个线程要想执行必须先抢GIL锁,所以同一个进程下多个线程肯定不能同时运行,即无法利用多核优势。但是多个任务都是IO密集型的那么多线程优势就体现出来了(消耗的资源更少)

GIL和互斥锁的区别

验证GIL的存在

from threading import Thread, Lock
import time

money = 100


def task():
    global money
    # time.sleep(0.1)
    money -= 1


for i in range(100):  # 创建100个线程
    t = Thread(target=task)
    t.start()

print(money)
# 0

同一个进程下的多个线程虽然有GIL的存在不会出现并行的效果,但是如果线程内有IO操作还是会造成数据的错乱,这个时候需要额外的添加互斥锁

from threading import Thread, Lock
import time

money = 100
mutex = Lock()


def task():
    global money
    mutex.acquire()
    tmp = money
    time.sleep(0.1)
    money = tmp - 1
    mutex.release()
    # 抢锁放锁也有简便写法(with上下文管理)

t_list = []
for i in range(100):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
# 应该等待所有的线程运行完毕再打印money
print(money)
# 100

验证多线程的作用

计算密集型

​ 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

多进程

from multiprocessing import Process
import os
import time


def task():
    res = 1
    for i in range(1, 10000):
        res *= i


if __name__ == '__main__':
    print(os.cpu_count())  # 查看当前计算机CPU个数
    start_time = time.time()
    p_list = []
    for i in range(10):
        p = Process(target=task)
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print('计算密集型,多进程执行总耗时:%s' % (time.time() - start_time))
    # 计算密集型,多进程执行总耗时:0.07307910919189453

多线程

from threading import Thread
import time


def task():
    res = 1
    for i in range(1, 10000):
        res *= i


start_time = time.time()
t_list = []
for i in range(10):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print('计算密集型,多线程执行总耗时:%s' % (time.time() - start_time))
# 计算密集型,多线程执行总耗时:0.29865002632141113

结论

经过两者对比可以看出,在计算密集型多进程多线程更有优势

IO密集型

​ 大部分的程序在运行时,都需要大量IO操作,比如网络数据的收发,大文件的读写,这样的程序称为IO密集型程序。IO密集型程序在运行时,需要大量的时间进行等待,如果IO操作不完成,程序无法执行后面的操作,一直处于等待状态,导致CPU空闲。由于GIL的存在,同一时刻只能有一个线程执行,在程序进行IO操作时,CPU实际并没有做任何工作,程序执行效率非常低。

​ 为了提高CPU的使用率,Python解释在程序执行IO等待时,会释放GIL锁,让其它线程执行,提高Python程序的执行效率。

所以,GIL对于IO密集型的影响很小,多线程适合用来做IO密集型的程序。

多进程

from multiprocessing import Process
import time


def task():
    time.sleep(0.1)  # 模拟纯IO操作


if __name__ == '__main__':
    start_time = time.time()
    p_list = []
    for i in range(100):
        p = Process(target=task)
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print('IO密集型,多进程总耗时:%s' % (time.time() - start_time))
    # IO密集型,多进程总耗时:0.26693105697631836

多线程

from threading import Thread
import time


def task():
    time.sleep(0.1)  # 模拟纯IO操作


start_time = time.time()
t_list = []
for i in range(10):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print('IO密集型,多线程总耗时:%s' % (time.time() - start_time))
# IO密集型,多线程总耗时:0.10392904281616211

结论

经过两者对比可以看出,在IO密集型线进程进线程更有优势

死锁现象

​ 死锁现象指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

​ 在多道程序系统中,由于多个进程的并发执行,改善了系统资源的利用率并提高了系统的处理能力。然而,多个进程的并发执行也带来了新的问题——死锁。所谓死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。

实现死锁现象

from threading import Thread, Lock
import time

mutexA = Lock()
mutexB = Lock()


class MyThread(Thread):
      
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        time.sleep(2)
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexA.release()
        mutexB.release()


for i in range(10):
    t = MyThread()
    t.start()

原因

​ 线程1 先开始执行func1,分别拿到AB锁,然后再释放AB锁,线程1继续先执行func2,先拿到了B锁,开始sleep,这时线程2拿到了A锁。这时候就形成了僵局,线程2想要线程1手里的B锁,线程1想要线程2里的A锁。

解决死锁现象

from threading import Thread, RLock
import time

mutexA = RLock()
mutexB = mutexA 


class MyThread(Thread):

    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(f'{self.name}抢到了B锁')
        time.sleep(2)
        mutexA.acquire()
        print(f'{self.name}抢到了A锁')
        mutexA.release()
        mutexB.release()


for i in range(10):
    t = MyThread()
    t.start()

信号量

​ 在互斥锁中同时只允许一个线程更改数据,而在信号量Semaphore是同时(无序轮流)允许一定数量(设置参数)的线程更改数据 ,内置的计数器,每当调用acquire()时减1,调用release()时加1。计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()

from threading import Thread, Semaphore
import time
import random

sp = Semaphore(5)  # 每次最大5个线程运行


def task(num):
    sp.acquire()  # 抢锁
    print("线程%s,抢锁" % num)
    time.sleep(random.randint(1, 5))
    sp.release()  # 放锁
    print("线程%s,放锁" % num)


for i in range(20):
    t = Thread(target=task, args=(i,))
    t.start()

event事件

事件event中有一个全局内置标志Flag,值为 True 或者False。使用wait()函数的线程会处于阻塞状态,此时Flag指为False,直到有其他线程调用set()函数让全局标志Flag置为True,其阻塞的线程立刻恢复运行,还可以用isSet()函数检查当前的Flag状态.

函数描述
set()全局内置标志Flag,将标志Flag 设置为 True,通知在等待状态(wait)的线程恢复运行;
isSet()获取标志Flag当前状态,返回True 或者 False;
wait()一旦调用,线程将会处于阻塞状态,直到等待其他线程调用set()函数恢复运行;
clear()将标志设置为False;
from threading import Thread, Event
import time
import random

event = Event()  # 创建event事件,造了一个红绿灯


def light():
    print('红灯亮,所有人不能动')
    time.sleep(3)
    print('绿灯亮,油门踩到底,冲!!!')
    event.set()


def car(name):
    print("选手%s号,正在等红灯" % name)
    event.wait()
    print("选手%s号加油门,飙车了" % name)


t = Thread(target=light)
t.start()
for i in range(20):
    t = Thread(target=car, args=(i,))
    t.start()

补充:事件Event主要用于唤醒正在阻塞等待状态的线程

进程池与线程池(重点)

保证硬件能够正常工作的情况下,最大限度的利用资源。如何理解这句话呢?比如TCP服务端实现了并发效果,来一个人就开始一个进程或线程来服务这个人,那么如果来一个亿人呢?无论是开设进程还是线程都需要消耗资源,只是开设线程的资源会比进程小一点,但是也经不起一个一个开,瓶颈是计算机硬件资源无法跟上。

“池”的意思就是在保证计算机能够正常运行的前提下,能最大限度开采的资源的量,虽然降低了程序的运行效率,但是保证了计算机硬件的安全,而让程序可以正常运行

Python提供了 concurrent.futures 模块,这个模块具有线程池(ThreadPoolExecutor)和进程`池(ProcessPoolExecutor`),用来管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能

常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。
  • cancel():取消某个任务
  • done():判断某一个线程是否完成
  • result(timeout=None):取得结果,如果代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。

官方文档:https://docs.python.org/zh-cn/3/library/concurrent.futures.html

进程池

提前创建好固定数量的进程,后续反复使用这些进程

from concurrent.futures import ProcessPoolExecutor
import os, time, random


def task(n):
    print('PID: %s is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n ** 2


if __name__ == '__main__':
    pool = ProcessPoolExecutor(5)  # 线程池线程数默认是CPU个数的五倍 也可以自定义
    futures = []
    for i in range(11):
        t = pool.submit(task, i)  # 将可调用对象封装为异步执行
        futures.append(t)
    pool.shutdown(True)  # 等待返回结果才能继续往下执行
    for future in futures:
        print(future.result())

线程池

提前创建好固定数量的线程,后续反复使用这些线程

from concurrent.futures import ThreadPoolExecutor
import os, time, random


def task(n):
    print('PID: %s is running' % os.getpid())
    time.sleep(random.randint(1, 3))
    return n ** 2


def fn(n):
    print(n.result())


if __name__ == '__main__':
    pool = ThreadPoolExecutor(5)  # 线程池线程数默认是CPU个数的五倍 也可以自定义

    for i in range(11):
        pool.submit(task, i).add_done_callback(fn)  # 一旦某个线程执行完成任务,就会执行执行fn方法,并且fn方法的第一个参数就是future对象 

总结

需要掌握的就这三行代码

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

pool = ThreadPoolExecutor()
pool.submit(task, i).add_done_callback(call_back)
Last modification:April 23, 2022
如果觉得我的文章对你有用,请随意赞赏