三、Python-进程、线程(笔记)


返回

1、计算机基础

  1. 三部分
    • 软件
    • 操作系统

      提供硬件操作接口
      管理调度进程

    • 硬件

      cpu
      内存(速度快、断点数据消失)
      硬盘(速度慢、永久保存数据)

  2. 批处理系统
    • 串行执行程序

      一个一个按顺序执行程序

  3. 多道计数
    • 空间复用

      内存存储多个程序

    • 时间复用

      减少cpu等待时间(减少IO阻塞)

    • 并发执行

      看起来同时执行
      单核并发,多核并行

  4. 分时操作系统
    • 多个程序进程之间的内存互相隔离

2、进程

  1. 状态
    • 运行 --> 就绪 --> 运行
    • 运行 --> 阻塞 --> 就绪 --> 运行
  2. 使用方法
    1. multiprocessing模块.Process对象
      from multiprocessing import Process
      import os
      
      
      def func():
          print('子进程,ID:', os.getpid())  # 子进程,ID: 71980
      
      
      if __name__ == '__main__':
          t = Process(target=func, args=())
          t.start()  # 执行func函数
          print('主进程,ID:', os.getpid())  # 主进程,ID: 71978
      
    2. 自定义模块
      from multiprocessing import Process
      import os
      
      
      class MyProcess(Process):
          def __init__(self):
              super(MyProcess, self).__init__()
              print('init,ID:', os.getpid())  # init,ID: 72097
      
          def run(self):
              print('子进程,ID:', os.getpid())  # 子进程,ID: 72099
      
      
      if __name__ == '__main__':
          t = MyProcess()
          t.start()  # 执行run函数
          print('主进程,ID:', os.getpid())  # 主进程,ID: 72097
      
    3. 其它方法

      t.jion() # 等待子进程结束
      t.is_alive() # 判断进程是否结束
      t.pid # 当前进程ID
      t.terminate() # 结束进程
      t.name # 进程名

  3. 僵尸进程
    • 子进程被kill,父进程还在
    • 缺点:父进程不结束,僵尸进程占用系统PID号
  4. 孤儿进程
    • 子进程还在,父进程被kill
    • 被init进程监管,无害
  5. 进程之间内存隔离
    from multiprocessing import Process
    
    n = 10
    
    
    def func():
        global n
        n = 0
        print('子进程', n)
    
    
    if __name__ == '__main__':
        t = Process(target=func, )
        t.start()
        t.join()
        print('主进程', n)
    
  6. 守护进程
    • 父进程结束,守护进程也跟着结束
    • 守护进程内不能再开启子进程
    • 用法:t.daemon = Ture
      from multiprocessing import Process
      import time
      
      
      def func():
          for i in range(10):
              time.sleep(1)
              print('子进程', i)
      
      
      if __name__ == '__main__':
          t = Process(target=func, )
          t.daemon = True
          t.start()
          time.sleep(3)
          print('主进程')
      
  7. 互斥锁
    • 与join的区别

      互斥锁可以把代码局部锁定(如对共享数据的修改)
      join把整个进程锁定

    from multiprocessing import Process, Lock
    import time
    
    
    def func(mutex, i):
        mutex.acquire()  # 加锁
        for j in range(3):
            time.sleep(1)
            print('process %s:%s' % (i, j))
        mutex.release()  # 释放
    
    
    def process():
        mutex = Lock()
        for i in range(2):
            p = Process(target=func, args=(mutex, i))
            p.start()
    
    
    if __name__ == '__main__':
        process()
    
  8. 队列
    • q = Queue(3)
    • q.put(msg)
    • msg = q.get()
    • 生产者消费者模型,Queue
      from multiprocessing import Queue, Process
      import time
      
      
      def producer(q):
          for i in range(5):
              time.sleep(0.5)
              res = '包子%s' % i
              print('生产者生产了%s' % res)
              q.put(res)
      
      
      def customer(q):
          while True:
              time.sleep(0.3)
              res = q.get()
              if not res:
                  break
              print('消费者吃了%s' % res)
      
      
      if __name__ == '__main__':
      q = Queue()
      for i in range(3):
          p = Process(target=producer, args=(q,))
          p.start()
      pc = Process(target=customer, args=(q,))
      pc2 = Process(target=customer, args=(q,))
      pc.start()
      pc2.start()
      p.join()
      q.put(None)
      q.put(None)
      
    • 生产者消费者模型,JoinableQueue
      from multiprocessing import JoinableQueue, Process
      import time
      
      
      def producer(q):
          for i in range(5):
              time.sleep(0.5)
              res = '包子%s' % i
              print('生产者生产了%s' % res)
              q.put(res)
          q.join()  # 队列为空时,执行
      
      
      def customer(q):
          while True:
              time.sleep(0.3)
              res = q.get()
              print('消费者吃了%s' % res)
              q.task_done()  # 消费者吃完所有包子,触发q.join()
      
      
      if __name__ == '__main__':
          q = JoinableQueue()
          for i in range(3):
              p = Process(target=producer, args=(q,))
              p.start()
          pc = Process(target=customer, args=(q,))
          pc2 = Process(target=customer, args=(q,))
          pc.daemon = True  # 避免孤儿进程
          pc2.daemon = True
          pc.start()
          pc2.start()
          p.join()
      

3、线程

  1. 用法
    • threading模块.Thread类
      from threading import Thread
      
      
      def func(name):
          print(name)
      
      
      if __name__ == '__main__':
          t = Thread(target=func, args=('子线程',))
          t.start()
          print('主线程')
      
    • 自定义
      from threading import Thread
      
      
      class MyThread(Thread):
          def __init__(self, name):
              super().__init__()
              self.name = name
      
          def run(self):
              print(self.name)
      
      
      if __name__ == '__main__':
          t = MyThread('子线程')
          t.start()
          print('主线程')
      
  2. 进程与线程区别
    • 开进程的开销远大于线程
    • 同一进程内的多线程共享该进程的地址空间
      from threading import Thread
      from multiprocessing import Process
      
      n = 10
      
      
      def func(name):
          global n
          n = 0
          print(name, n)
      
      
      if __name__ == '__main__':
          # p = Process(target=func, args=('子进程',))
          # p.start()
          # p.join()
          t = Thread(target=func, args=('子线程',))
          t.start()
          t.join()
          print('主线程', n)
      
    • 同一进程内多线程PID相同
      from threading import Thread
      from multiprocessing import Process
      import os
      
      
      def func(name):
          print('子线程id:%s,父线程id:%s' % (os.getpid(), os.getppid()))
      
      
      if __name__ == '__main__':
          # p = Process(target=func, args=('子进程',))
          # p.start()
          # p.join()
          t = Thread(target=func, args=('子线程',))
          t.start()
          t.join()
          print('主线程id:%s'% os.getpid())
      
    • 补充(终端查看pid方法)

      Windows:tasklist |findstr python
      Linux:ps aux |grep python

  3. 其它方法
    from threading import Thread, current_thread, active_count, enumerate
    import os
    
    
    def func(name):
        pass
    
    
    if __name__ == '__main__':
        t = Thread(target=func, args=('子线程',))
        t.start()
        t.join()
        print('%s:%s' % (current_thread().getName(), os.getpid()))  # MainThread:4898
        current_thread().setName('主线程')
        print('%s:%s' % (current_thread().getName(), os.getpid()))  # 主线程:4898
        print(t.is_alive())  # False
        print(active_count())  # 1
        print(enumerate())  # [<_MainThread(主线程, started 4754021824)>]
    
  4. 守护线程
    • 所有非守护线程结束后,主线程才跟着结束
    • 主线程一旦结束,所有守护线程也跟着结束
    • 使用方法

      t.daemon = True
      t.setDaemon('Ture')

  5. 互斥锁
    from threading import Thread, Lock
    import time
    
    n = 100
    
    
    def a(lock):
        global n
        lock.acquire()
        temp = n
        time.sleep(0.0000001)
        n = temp - 1
        lock.release()
    
    
    if __name__ == '__main__':
        t_l = []
        lock = Lock()
        for i in range(100):
            t = Thread(target=a, args=(lock,))
            t.start()
            t_l.append(t)
        for t in t_l:
            t.join()
        print(n)  # n==0
    
  6. GIL锁
    • 保证同一个进程下的所有线程在同一时刻只有一个运行
      GIL.acquire()
      解释器的代码  # 保证python解释器垃圾回收机制是线程安全的
      GIL.release()
      
    • 只能并发,不能并行
  7. 计算密集型,推荐多进程
    from threading import Thread
    from multiprocessing import Process
    import time
    
    
    def func():
        n = 0
        for i in range(100000000):
            n += i
        print(n)
    
    
    if __name__ == '__main__':
        time_start = time.time()
        ll = []
        for i in range(2):
            t = Thread(target=func, )  # 线程14秒
            # t = Process(target=func,)  # 进程8秒
            t.start()
            ll.append(t)
        for t in ll:
            t.join()
        print(time.time() - time_start)
    
    
  8. IO密集型,推荐多线程
    from threading import Thread
    from multiprocessing import Process
    import time
    
    
    def func():
        time.sleep(1)
    
    
    if __name__ == '__main__':
        time_start = time.time()
        ll = []
        for i in range(50):
            t = Thread(target=func, )  # 线程1.01秒
            # t = Process(target=func,)  # 进程2.51秒
            t.start()
            ll.append(t)
        for t in ll:
            t.join()
        print(time.time() - time_start)
    
  9. 死锁
    • Lock的锁只能acquire一次
      from threading import Thread, Lock, current_thread
      import time
      
      mutex1 = Lock()
      mutex2 = Lock()
      
      
      def func():
          mutex1.acquire()
          print(1, current_thread().getName())
          mutex2.acquire()
          print(2, current_thread().getName())
          mutex2.release()
          mutex1.release()
      
          mutex2.acquire()
          print(2, current_thread().getName())
          time.sleep(0.1)
          mutex1.acquire()
          print(1, current_thread().getName())
          mutex1.release()
          mutex2.release()
      
      
      if __name__ == '__main__':
          for i in range(10):
              t = Thread(target=func, )
              t.start()
      
  10. 递归锁
    • RLock的锁可以acquire多次
      from threading import Thread, RLock, current_thread
      import time
      
      mutex2 = mutex1 = RLock()  # 同一把锁
      # mutex1 = RLock()  # 分开两把锁,还是会死锁
      # mutex2 = RLock()
      
      
      def func():
          mutex1.acquire()
          print(1, current_thread().getName())
          mutex2.acquire()
          print(2, current_thread().getName())
          mutex2.release()
          mutex1.release()
      
          mutex2.acquire()
          print(2, current_thread().getName())
          time.sleep(0.1)
          mutex1.acquire()
          print(1, current_thread().getName())
          mutex1.release()
          mutex2.release()
      
      
      if __name__ == '__main__':
          for i in range(10):
              t = Thread(target=func, )
              t.start()
      
  11. 信号量
    • 信号量也是一把锁,与互斥锁不同的是,信号量可以设置同时拿到锁的线程的个数
      from threading import Thread, Semaphore, current_thread
      import time, random
      
      sm = Semaphore(3)
      
      
      def func():
          # sm.acquire()
          # print(current_thread().getName())
          # time.sleep(random.randint(1,3))
          # sm.release()
          with sm:
              print(current_thread().getName())
              time.sleep(random.randint(1, 3))
      
      
      if __name__ == '__main__':
          for i in range(10):
              t = Thread(target=func, )
              t.start()
      
      
  12. event
    from threading import Thread, Event, current_thread
    import time
    
    event = Event()
    
    
    def conn():
        count = 0
        while not event.is_set():  # 判断event状态
            if count == 3:
                print('nooo')
                return
            print('waiting', current_thread().getName())
            event.wait(1)  # 等待set,可以设置时间参数
            count += 1
        print('okk', current_thread().getName())
    
    
    def check():
        print('checking', current_thread().getName())
        time.sleep(4)
        event.set()  # 激活event
    
    
    if __name__ == '__main__':
        for i in range(3):
            t = Thread(target=conn, )
            t.start()
        c = Thread(target=check)
        c.start()
    
  13. 定时器
    from threading import Timer
    import random
    
    
    class MakeQR:
        def __init__(self):
            self.make_cache()
    
        def make_cache(self):
            self.qr = self.make_qr()
            print(self.qr)
            self.t = Timer(3, self.make_cache, args=())  # 定时3秒,创建线程
            self.t.start()
    
        def make_qr(self):
            res = random.randint(1, 3)
            return str(res)
    
        def check(self):
            while True:
                inp = input('>>:')
                if inp == self.qr:
                    print('okk')
                    self.t.cancel()
                    break
    
    
    if __name__ == '__main__':
        obj = MakeQR()
        obj.check()
    

4、线程Queue

  1. 先进先出 - 队列Queue
    import queue
    
    q = queue.Queue(1)
    q.put(1)
    q.put_nowait(4)  # 等同于q.put(4, block=False)
    q.put(4, block=True)  # 锁住不报错
    q.put(4, block=False)  # 不锁报错
    q.put(4, block=True, timeout=2)  # 锁住2秒,2秒后报错
    
    print(q.get())
    print(q.get_nowait())  # 等同于q.get(block=False)
    print(q.get(block=True))  # 锁住不报错
    print(q.get(block=False))  # 不锁报错
    print(q.get(block=True, timeout=2))  # 锁住2秒,2秒后报错
    
  2. 后进先出 - 堆栈LifoQueue
    • q = queue.LifoQueue(3)
  3. 优先级队列 - PriorityQueue
    import queue
    
    q = queue.PriorityQueue(3)
    q.put((1, 'one'))  # 参数(优先级,msg),数字越小优先级越高
    q.put((3, 'two'))
    q.put((2, 'three'))
    print(q.get())
    print(q.get())
    print(q.get())
    

5、池

  1. 进程池
    • from concurrent.futures import ProcessPoolExecutor
    • pool = ProcessPoolExecutor(3)
    • pool.submit(run)
  2. 线程池
    • from concurrent.futures import ThreadPoolExecutor
    • pool = ThreadPoolExecutor(3)
    • pool.submit(run)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time


def run():
    print(os.getpid())
    time.sleep(2)


if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)  # 进程池
    # pool = ThreadPoolExecutor(3)  # 线程池
    for i in range(10):
        pool.submit(run)  # 不需要start
    pool.shutdown(wait=True)
    print('main')
  1. Queue方法实现线程池
    from threading import Thread
    from queue import Queue
    import os
    import time
    
    
    def run(q):
        print(os.getppid())
        time.sleep(2)
        q.get()  # 线程结束后,从队列取出一个标志
    
    
    if __name__ == '__main__':
        q = Queue(3)
        for i in range(10):
            t = Thread(target=run, args=(q,))
            q.put(1)  # 线程开始前,队列内新增一个标志,队列满则阻塞
            t.start()
        print('main')
    

6、同步异步

  1. 同步调用
    • 提交任务,直到等待任务完成后,才执行下一行代码,导致程序串行执行
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time


def run():
    print(os.getpid())
    time.sleep(2)


if __name__ == '__main__':
    # pool = ProcessPoolExecutor(3)  # 进程池
    pool = ThreadPoolExecutor(3)  # 线程池
    for i in range(10):
        pool.submit(run).result()  # 加result(),线程结束后才开下一个线程
    pool.shutdown(wait=True)
    print('main')
  1. 异步调用
    • 提交任务,不用等待任务结束,立即执行下一行代码,程序并行
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time


def run():
    print(os.getpid())
    time.sleep(2)
    return 'okk'


def func(r):  # 被回调的函数,参数为pool.submit(run)
    res = r.result()  # 拿到run函数的返回结果,代表线程结束,相当于pool.submit(run).result()
    print(res)


if __name__ == '__main__':
    # pool = ProcessPoolExecutor(3)  # 进程池
    pool = ThreadPoolExecutor(3)  # 线程池
    for i in range(10):
        pool.submit(run).add_done_callback(func)  # 回调函数,参数为函数名
    pool.shutdown(wait=True)
    print('main')
返回