进程与线程

进程

进程本质上就是程序数据从硬盘中读取到内存中运行,从而形成一个进程。

进程运行三种状态:

img

1、就绪状态。某些进程“万事俱备”(必要资源),只差CPU。(就绪队列)

2、执行状态。某进程占有CPU并在CPU上执行其程序。

3、阻塞状态。某些进程由于某种原因不能继续运行下去,等待处 理问题。也称为等待状态或封锁状态。如:请求I/O。(多个等待队列)

进程状态转换的原因:

1)就绪-→执行:

对就绪状态的进程,当进程调度程序按一种选定的策略从中选中一个就绪进程,为之分配了处理机后,该进程便由就绪状态变为执行状态;

2)执行-→阻塞:

正在执行的进程因发生某等待事件而无法执行,则进程由执行状态变为阻塞状态。

如:进程提出输入/输出请求而变成等待外部设备传输信息的状态,进程申请资源(主存空间或外部设备)得不到满足时变成等待资源状态,进程运行中出现了故障(程序出错或主存储器读写错等)变成等待干预状态等等;

3)阻塞-→就绪:

处于阻塞状态的进程,在其等待的事件已经完成,如输入/输出完成,资源得到满足或错误处理完毕时,处于等待状态的进程并不马上转入执行状态,而是先转入就绪状态,然后再由系统进程调度程序在适当的时候将该进程转为执行状态;

4)执行-→就绪:

正在执行的进程,因时间片用完而被暂停执行,或在采用抢先式优先级调度算法的系统中,当有更高优先级的进程要运行而被迫让出处理机时,该进程便由执行状态转变为就绪状态。

阻塞与非阻塞

阻塞与非阻塞是用来定义程序的运行状态:

​ 阻塞:阻塞态
​ 非阻塞:就绪态、运行态

同步和异步

正常情况下,一个进程运行首先进入就绪态,因一个cpu不能同时调度多个作业,所以需要排队等待进入运行态。进入运行态时,此时的进程正在被cpu调度运行。当进程进入i/o状态时,此时该进程就会从运行态变为阻塞态(系统此时会把cpu分给其他进程)。

1
2
3
4
5
6
7
"""描述的是任务的提交方式"""
同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事(干等)
程序层面上表现出来的感觉就是卡住了

异步:任务提交之后,不原地等待任务的返回结果,直接去做其他事情
我提交的任务结果如何获取?
任务的返回结果会有一个异步回调机制自动处理

开启进程的两种方式

定心丸:代码开启进程和线程的方式,代码书写基本是一样的,你学会了如何开启进程就学会了如何开启线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from multiprocessing import Process
import time


def task(name):
print('%s is running'%name)
time.sleep(3)
print('%s is over'%name)


if __name__ == '__main__':
# 1 创建一个对象
p = Process(target=task, args=('jason',))
# 容器类型哪怕里面只有1个元素 建议要用逗号隔开
# 2 开启进程
p.start() # 告诉操作系统帮你创建一个进程 异步
print('主')


# 第二种方式 类的继承
from multiprocessing import Process
import time


class MyProcess(Process):
def run(self):
print('hello bf girl')
time.sleep(1)
print('get out!')


if __name__ == '__main__':
p = MyProcess()
p.start()
print('主')

总结

1
2
3
4
5
6
"""
创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去
一个进程对应在内存中就是一块独立的内存空间
多个进程对应在内存中就是多块独立的内存空间
进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块
"""

join方法

join是让主进程等待子进程代码运行结束之后,再继续运行。不影响其他子进程的执行

下面的代码当主进程P.start执行运行子进程代码后,主进程并不会等子进程运行完再往下运行,所以正常情况下程序输出‘主进程已经结束这句话不会再最后面’,如果想让主进程等待子进程执行完字节代码再往下执行就在再需要等待子进程执行完再执行的主代码前面加上join方法

image-20211109103747341

方法如下,此时主进程会等待子进程执行完再打印’主进程已经结束’。

image-20211109104132647

管理进程pid

​ 方法: os.getpid,os.getppid,进程对象.terminate,进程对象.is_alive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"""
一台计算机上面运行着很多进程,那么计算机是如何区分并管理这些进程服务端的呢?
计算机会给每一个运行的进程分配一个PID号
如何查看
windows电脑
进入cmd输入tasklist即可查看
tasklist |findstr PID查看具体的进程
mac电脑
进入终端之后输入ps aux
ps aux|grep PID查看具体的进程
"""
from multiprocessing import Process, current_process
current_process().pid # 查看当前进程的进程号

import os
os.getpid() # 查看当前进程进程号
os.getppid() # 查看当前进程的父进程进程号


p.terminate() # 杀死当前进程
# 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
time.sleep(0.1)
print(p.is_alive()) # 判断当前进程是否存活

僵尸进程与孤儿进程(了解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 僵尸进程
"""
死了但是没有死透
当你开设了子进程之后 该进程死后不会立刻释放占用的进程号
因为我要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间。。。
所有的进程都会步入僵尸进程
父进程不死并且在无限制的创建子进程并且子进程也不结束
回收子进程占用的pid号
父进程等待子进程运行结束
父进程调用join方法
"""

# 孤儿进程
"""
子进程存活,父进程意外死亡
操作系统会开设一个“儿童福利院”专门管理孤儿进程回收相关资源
"""

守护进程

在python中,当子进程设置守护进程以后,如果主进程结束,此时子进程也会立即结束,主进程不会在等待子进程运行完毕。这里设置守护进程必须在子进程启动以前设置,否则报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import time

def ceshi():
print('子进程正在运行')
time.sleep(3)
print('子进程已经结束')

if __name__ == '__main__':
P = Process(target=ceshi,)
print('主进程正在运行')
P.daemon = True
P.start()
print('主进程已经结束')

输出结果:

image-20211109111009514

这里甚至子进程都没有运行出来就结束了(主进程运行速度太快了,子进程刚创建还没来得及运行,主进程就结束了,并直接结束了子进程。)

如果这里主进程sleep1秒的话,则子进程就开始运行了。

image-20211109111117503

互斥锁

多个进程操作同一份数据的时候,会出现数据错乱的问题

针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from multiprocessing import Process
def write(i):
with open('ceshi.txt','w') as f:
f.write(str(i))
def read():
with open('ceshi.txt','r') as f:
i=f.read()
return i
#定义读和写两个函数
def ceshi():
i = int(read())
if i == 1:
i -= 1
write(i)
print('测试1判断i=1,则将i-1')

def ceshi2():
i = int(read())
if i == 1:
i -= 1
write(i)
print('测试2判断i=1,则将i-1')

#两个测试函数分别判断i是否等于1,如果为1则修改
if __name__ == '__main__':
i=1
write(i)
#读取文件时先将1设置为1
P = Process(target=ceshi,)
P2 = Process(target=ceshi2,)
P.start()
P2.start()
P.join()
P2.join()
i = read() #等两个文件操作完再读取文件
print(f'主进程已经结束,i的值为{i}')

两个子进程运行时,都会读取判断文件ceshi.txt里面的值是否为1,如果某一个文件先读取,那么只有一个文件能修改到值,那程序无任何问题,但当两个子进程同时读取文件时,就会出现两个进程都判断其值为1且对其进行修改值操作,显然对于程序来说,不应如此。例如火车抢票,此时票数为1,抢到票时火车的票就-1,如果两个人都同时抢票发现这个时候票为1,于是就都会抢票成功,显然这是一个bug,此时应该在上面加上锁。

改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiprocessing import Process,Lock
def write(i):
with open('ceshi.txt','w') as f:
f.write(str(i))
def read():
with open('ceshi.txt','r') as f:
i=f.read()
return i
def ceshi(lock):
l=run(lock)
if l is True:
print('测试1判断i=1,则将i-1')

def ceshi2(lock):
l=run(lock)
if l is True:
print('测试2判断i=1,则将i-1')
def run(lock):
lock.acquire()
i = int(read())
if i == 1:
i -= 1
write(i)
lock.release()
return True
else:
return False

if __name__ == '__main__':
i=1
write(i)
lock = Lock()
P = Process(target=ceshi,args=(lock,))
P2 = Process(target=ceshi2,args=(lock,))
P.start()
P2.start()
P.join()
P2.join()
i = read()
print(f'主进程已经结束,i的值为{i}')

从run函数可以看到,此时读和写都从并发改为串行了,此时只能一个一个读然后再写,真正再项目环境可以把读写分开,以免一个人在读数据时其他人都要等着,应该是一个人在写数据时再上锁。

进程间通信

队列Queue模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""
管道:subprocess
stdin stdout stderr
队列:管道+锁

队列:先进先出
堆栈:先进后出
"""
from multiprocessing import Queue

# 创建一个队列
q = Queue(5) # 括号内可以传数字 标示生成的队列最大可以同时存放的数据量

# 往队列中存数据
q.put(111)
q.put(222)
q.put(333)
# print(q.full()) # 判断当前队列是否满了
# print(q.empty()) # 判断当前队列是否空了
q.put(444)
q.put(555)
# print(q.full()) # 判断当前队列是否满了

# q.put(666) # 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错

"""
存取数据 存是为了更好的取
千方百计的存、简单快捷的取

同在一个屋檐下
差距为何那么大
"""

# 去队列中取数据
v1 = q.get()
v2 = q.get()
v3 = q.get()
v4 = q.get()
v5 = q.get()
# print(q.empty())
# V6 = q.get_nowait() # 没有数据直接报错queue.Empty
# v6 = q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错 queue.Empty
try:
v6 = q.get(timeout=3)
print(v6)
except Exception as e:
print('一滴都没有了!')

# # v6 = q.get() # 队列中如果已经没有数据的话 get方法会原地阻塞
# print(v1, v2, v3, v4, v5, v6)

"""
q.full()
q.empty()
q.get_nowait()
在多进程的情况下是不精确
"""

IPC机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Queue, Process

"""
研究思路
1.主进程跟子进程借助于队列通信
2.子进程跟子进程借助于队列通信
"""
def producer(q):
q.put('我是23号技师 很高兴为您服务')


def consumer(q):
print(q.get())


if __name__ == '__main__':
q = Queue()
p = Process(target=producer,args=(q,))
p1 = Process(target=consumer,args=(q,))
p.start()
p1.start()

生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
"""
生产者:生产/制造东西的
消费者:消费/处理东西的
该模型除了上述两个之外还需要一个媒介
生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿
厨师做菜做完之后用盘子装着给你消费者端过去
生产者和消费者之间不是直接做交互的,而是借助于媒介做交互

生产者(做包子的) + 消息队列(蒸笼) + 消费者(吃包子的)
"""
from multiprocessing import Process, Queue, JoinableQueue
import time
import random


def producer(name,food,q):
for i in range(5):
data = '%s生产了%s%s'%(name,food,i)
# 模拟延迟
time.sleep(random.randint(1,3))
print(data)
# 将数据放入 队列中
q.put(data)


def consumer(name,q):
# 消费者胃口很大 光盘行动
while True:
food = q.get() # 没有数据就会卡住
# 判断当前是否有结束的标识
# if food is None:break
time.sleep(random.randint(1,3))
print('%s吃了%s'%(name,food))
q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了


if __name__ == '__main__':
# q = Queue()
q = JoinableQueue()
p1 = Process(target=producer,args=('大厨egon','包子',q))
p2 = Process(target=producer,args=('马叉虫tank','泔水',q))
c1 = Process(target=consumer,args=('春哥',q))
c2 = Process(target=consumer,args=('新哥',q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
# 等待生产者生产完毕之后 往队列中添加特定的结束符号
# q.put(None) # 肯定在所有生产者生产的数据的末尾
# q.put(None) # 肯定在所有生产者生产的数据的末尾
q.join() # 等待队列中所有的数据被取完再执行往下执行代码
"""
JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
没当你调用task_done的时候 计数器-1
q.join() 当计数器为0的时候 才往后运行
"""
# 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了

线程理论

  • 什么是线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    """
    进程:资源单位
    线程:执行单位

    将操作系统比喻成一个大的工厂
    那么进程就相当于工厂里面的车间
    而线程就是车间里面的流水线

    每一个进程肯定自带一个线程

    再次总结:
    进程:资源单位(起一个进程仅仅只是在内存空间中开辟一块独立的空间)
    线程:执行单位(真正被cpu执行的其实是进程里面的线程,线程指的就是代码的执行过程,执行代码中所需要使用到的资源都找所在的进程索要)

    进程和线程都是虚拟单位,只是为了我们更加方便的描述问题
    """
  • 为何要有线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    """
    开设进程
    1.申请内存空间 耗资源
    2.“拷贝代码” 耗资源
    开线程
    一个进程内可以开设多个线程,在用一个进程内开设多个线程无需再次申请内存空间操作

    总结:
    开设线程的开销要远远的小于进程的开销
    同一个进程下的多个线程数据是共享的!!!
    """
    我们要开发一款文本编辑器
    获取用户输入的功能
    实时展示到屏幕的功能
    自动保存到硬盘的功能
    针对上面这三个功能,开设进程还是线程合适???
    开三个线程处理上面的三个功能更加的合理
  • 如何使用

开启线程的两种方式

导入模块式

image-20211111094637428

1
2
import Thread
duxiangming=Thread(target=ceshi,args=(i,))

集成类式

1
2
3
4
5
6
7
8
9
from threading import Thread
class ceshi(Thread):
def run(self):
print('g')

if __name__ == '__main__':
for i in range(5):
cs = ceshi()
cs.start()

线程与进程区别

在使用方法上,例如join,互斥锁,join方法,queue消息队列都与进程一致。两者的区别在于线程无需开辟新的内存空间,进程的创建需要开启新的内存空间,因此线程相较于进程来讲,其创建的开销要小很多,但是由于cpython(python用的最多的解释器)的GIL解释器锁的原因,python线程无法实现真正的多线程(利用多核),只能单核使用,但由于其线程创建开销少,所以还是适用于密集型i/o程序,如果是密集型运算程序,还是需要使用多进程。

GIL全局解释器锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
python解释器其实有多个版本
Cpython
Jpython
Pypypython
但是普遍使用的都是CPython解释器

在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行
同一个进程下的多个线程无法利用多核优势!!!
疑问:python的多线程是不是一点用都没有???无法利用多核优势

因为cpython中的内存管理不是线程安全的
内存管理(垃圾回收机制)
1.应用计数
2.标记清楚
3.分代回收

"""

"""
重点:
1.GIL不是python的特点而是CPython解释器的特点
2.GIL是保证解释器级别的数据的安全
3.GIL会导致同一个进程下的多个线程的无法同时执行即无法利用多核优势(******)
4.针对不同的数据还是需要加不同的锁处理
5.解释型语言的通病:同一个进程下多个线程无法利用多核优势

GIL与普通互斥锁的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from threading import Thread,Lock
import time


mutex = Lock()
money = 100


def task():
global money
# with mutex:
# tmp = money
# time.sleep(0.1)
# money = tmp -1
mutex.acquire()
tmp = money
time.sleep(0.1) # 只要你进入IO了 GIL会自动释放
money = tmp - 1
mutex.release()


if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)



"""
100个线程起起来之后 要先去抢GIL
我进入io GIL自动释放 但是我手上还有一个自己的互斥锁
其他线程虽然抢到了GIL但是抢不到互斥锁
最终GIL还是回到你的手上 你去操作数据
"""

进程池与线程池

先回顾之前TCP服务端实现并发的效果是怎么玩的

每来一个人就开设一个进程或者线程去处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
"""
无论是开设进程也好还是开设线程也好 是不是都需要消耗资源
只不过开设线程的消耗比开设进程的稍微小一点而已

我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
硬件的开发速度远远赶不上软件呐

我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
"""
# 池的概念
"""
什么是池?
池是用来保证计算机硬件安全的情况下最大限度的利用计算机
它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行
"""

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import os


# pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
pool = ProcessPoolExecutor(5)
# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
"""
池子造出来之后 里面会固定存在五个线程
这个五个线程不会出现重复创建和销毁的过程
池子造出来之后 里面会固定的几个进程
这个几个进程不会出现重复创建和销毁的过程

池子的使用非常的简单
你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
"""


def task(n):
print(n,os.getpid())
time.sleep(2)
return n**n

def call_back(n):
print('call_back>>>:',n.result())
"""
任务的提交方式
同步:提交任务之后原地等待任务的返回结果 期间不做任何事
异步:提交任务之后不等待任务的返回结果 执行继续往下执行
返回结果如何获取???
异步提交任务的返回结果 应该通过回调机制来获取
回调机制
就相当于给每个异步任务绑定了一个定时炸弹
一旦该任务有结果立刻触发爆炸
"""
if __name__ == '__main__':
# pool.submit(task, 1) # 朝池子中提交任务 异步提交
# print('主')
t_list = []
for i in range(20): # 朝池子中提交20个任务
# res = pool.submit(task, i) # <Future at 0x100f97b38 state=running>
res = pool.submit(task, i).add_done_callback(call_back)
# print(res.result()) # result方法 同步提交
# t_list.append(res)
# 等待线程池中所有的任务执行完毕之后再继续往下执行
# pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕
# for t in t_list:
# print('>>>:',t.result()) # 肯定是有序的
"""
程序有并发变成了串行
任务的为什么打印的是None
res.result() 拿到的就是异步提交的任务的返回结果
"""

总结

1
2
3
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(task, i).add_done_callback(call_back)

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!