分析python并发网络通信模型

编辑: admin 分类: python 发布时间: 2021-12-24 来源:互联网
目录
  • 一、常见模型分类
    • 1.1、循环服务器模型
    • 1.2、IO并发模型
    • 1.3、多进程/线程网络并发模型
  • 二、基于fork的多进程网络并发模型
    • 三、基于threading的多线程网络并发
      • 四、ftp 文件服务器
        • 4.1、项目功能
        • 4.2、整体结构设计
      • 五、IO并发
        • 5.1、IO分类
        • 5.2、IO多路复用
        • 5.3、位运算
        • 5.4、poll方法实现IO多路复用
        • 5.5、epoll方法

      一、常见模型分类

      1.1、循环服务器模型

      循环接收客户端请求,处理请求。同一时刻只能处理一个请求,处理完毕后再处理下一个。

      • 优点:实现简单,占用资源少
      • 缺点:无法同时处理多个客户端请求
      • 适用情况:处理的任务可以很快完成,客户端无需长期占用服务端程序。udp比tcp更适合循环。

      1.2、IO并发模型

      利用IO多路复用,异步IO等技术,同时处理多个客户端IO请求。

      • 优点 : 资源消耗少,能同时高效处理多个IO行为
      • 缺点 : 只能处理并发产生的IO事件,无法处理cpu计算
      • 适用情况:HTTP请求,网络传输等都是IO行为。

      1.3、多进程/线程网络并发模型

      每当一个客户端连接服务器,就创建一个新的进程/线程为该客户端服务,客户端退出时再销毁该进程/线程。

      • 优点:能同时满足多个客户端长期占有服务端需求,可以处理各种请求。
      • 缺点: 资源消耗较大
      • 适用情况:客户端同时连接量较少,需要处理行为较复杂情况。

      二、基于fork的多进程网络并发模型

      1.创建监听套接字

      2.等待接收客户端请求

      3.客户端连接创建新的进程处理客户端请求

      4.原进程继续等待其他客户端连接

      5.如果客户端退出,则销毁对应的进程

      from socket import *
      import os
      import signal
      
      # 创建监听套接字
      HOST = '0.0.0.0'
      PORT = 8888
      ADDR = (HOST,PORT)
      
      # 客户端服务函数
      def handle(c):
        while True:
          data = c.recv(1024)
          if not data:
            break
          print(data.decode())
          c.send(b'OK')
        c.close()
      
      s = socket()  # tcp套接字
      s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)   # 设置套接字端口重用
      s.bind(ADDR)
      s.listen(3)
      
      signal.signal(signal.SIGCHLD,signal.SIG_IGN)    # 处理僵尸进程
      
      print("Listen the port %d..." % PORT)
      
      # 循环等待客户端连接
      while True:
        try:
          c,addr = s.accept()
        except KeyboardInterrupt:
          os._exit(0)
        except Exception as e:
          print(e)
          continue
      
        # 创建子进程处理这个客户端
        pid = os.fork()
        if pid == 0:  # 处理客户端请求
          s.close()
          handle(c)
          os._exit(0)  # handle处理完客户端请求子进程也退出
      
        # 无论出错或者父进程都要循环回去接受请求
        # c对于父进程没用
        c.close()

      三、基于threading的多线程网络并发

      1.创建监听套接字

      2.循环接收客户端连接请求

      3.当有新的客户端连接创建线程处理客户端请求

      4.主线程继续等待其他客户端连接

      5.当客户端退出,则对应分支线程退出

      from socket import *
      from threading import Thread
      import sys
      
      # 创建监听套接字
      HOST = '0.0.0.0'
      PORT = 8888
      ADDR = (HOST,PORT)
      
      # 处理客户端请求
      def handle(c):
        while True:
          data = c.recv(1024)
          if not data:
            break
          print(data.decode())
          c.send(b'OK')
        c.close()
      
      s = socket()  # tcp套接字
      s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
      s.bind(ADDR)
      s.listen(3)
      
      print("Listen the port %d..."%PORT)
      # 循环等待客户端连接
      while True:
        try:
          c,addr = s.accept()
        except KeyboardInterrupt:
          sys.exit("服务器退出")
        except Exception as e:
          print(e)
          continue
      
        # 创建线程处理客户端请求
        t = Thread(target=handle, args=(c,))
        t.setDaemon(True)   # 父进程结束则所有进程终止
        t.start()

      四、ftp 文件服务器

      4.1、项目功能

      客户端有简单的页面命令提示:功能包含:

      • 查看服务器文件库中的文件列表(普通文件)
      • 可以下载其中的某个文件到本地
      • 可以上传客户端文件到服务器文件库

      服务器需求 :

      • 允许多个客户端同时操作
      • 每个客户端可能回连续发送命令

      技术分析:

      • tcp套接字更适合文件传输
      • 并发方案 ---》 fork 多进程并发
      • 对文件的读写操作获取
      • 文件列表 ----》 os.listdir()

      粘包的处理

      4.2、整体结构设计

      • 服务器功能封装在类中(上传,下载,查看列表)
      • 创建套接字,流程函数调用 main()
      • 客户端负责发起请求,接受回复,展示
      • 服务端负责接受请求,逻辑处理

      ftp server:

      from socket import *
      from threading import Thread
      import os
      import time
      
      # 全局变量
      HOST = '0.0.0.0'
      PORT = 8080
      ADDR = (HOST,PORT)
      FTP = "/home/tarena/FTP/"  # 文件库位置
      
      # 创建文件服务器服务端功能类
      class FTPServer(Thread):
        def __init__(self,connfd):
          self.connfd = connfd
          super().__init__()
      
        def do_list(self):
          # 获取文件列表
          files = os.listdir(FTP)
          if not files:
            self.connfd.send("文件库为空".encode())
            return
          else:
            self.connfd.send(b'OK')
            time.sleep(0.1)  # 防止和后面发送内容粘包
      
          # 拼接文件列表
          files_ = ""
          for file in files:
            if file[0] != '.' and \
                    os.path.isfile(FTP+file):
              files_ += file + '\n'
          self.connfd.send(files_.encode())
      
        def do_get(self,filename):
          try:
            fd = open(FTP+filename,'rb')
          except Exception:
            self.connfd.send("文件不存在".encode())
            return
          else:
            self.connfd.send(b'OK')
            time.sleep(0.1)
          # 文件发送
          while True:
            data = fd.read(1024)
            if not data:
              time.sleep(0.1)
              self.connfd.send(b'##')
              break
            self.connfd.send(data)
      
        # 循环接收客户端请求
        def run(self):
          while True:
            data = self.connfd.recv(1024).decode()
            if not data or data == 'Q':
              return 
            elif data == 'L':
              self.do_list()
            elif data[0] == 'G':   # G filename
              filename = data.split(' ')[-1]
              self.do_get(filename)
      
      # 网络搭建
      def main():
        # 创建套接字
        sockfd = socket()
        sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        sockfd.bind(ADDR)
        sockfd.listen(3)
        print("Listen the port %d..."%PORT)
        while True:
          try:
            connfd,addr = sockfd.accept()
            print("Connect from",addr)
          except KeyboardInterrupt:
            print("服务器程序退出")
            return
          except Exception as e:
            print(e)
            continue
      
          # 创建新的线程处理客户端
          client = FTPServer(connfd)
          client.setDaemon(True)
          client.start()   # 运行run方法
      
      
      if __name__ == "__main__":
        main()

      ftp client:

      from socket import *
      import sys
      
      ADDR = ('127.0.0.1',8080) # 服务器地址
      
      # 客户端功能处理类
      class FTPClient:
        def __init__(self,sockfd):
          self.sockfd = sockfd
      
        def do_list(self):
          self.sockfd.send(b'L')  # 发送请求
          # 等待回复
          data = self.sockfd.recv(128).decode()
          if data == 'OK':
            # 一次接收文件列表字符串
            data = self.sockfd.recv(4096)
            print(data.decode())
          else:
            print(data)
      
        def do_get(self,filename):
          # 发送请求
          self.sockfd.send(('G '+filename).encode())
          # 等待回复
          data = self.sockfd.recv(128).decode()
          if data == 'OK':
            fd = open(filename,'wb')
            # 接收文件
            while True:
              data = self.sockfd.recv(1024)
              if data == b'##':
                break
              fd.write(data)
            fd.close()
          else:
            print(data)
      
        def do_quit(self):
          self.sockfd.send(b'Q')
          self.sockfd.close()
          sys.exit("谢谢使用")
      
      # 创建客户端网络
      def main():
        sockfd = socket()
        try:
          sockfd.connect(ADDR)
        except Exception as e:
          print(e)
          return
      
        ftp = FTPClient(sockfd) # 实例化对象
      
        # 循环发送请求
        while True:
          print("\n=========命令选项==========")
          print("****      list         ****")
          print("****    get file       ****")
          print("****    put file       ****")
          print("****      quit         ****")
          print("=============================")
      
          cmd = input("输入命令:")
      
          if cmd.strip() == 'list':
            ftp.do_list()
          elif cmd[:3] == 'get':
            # get filename
            filename = cmd.strip().split(' ')[-1]
            ftp.do_get(filename)
          elif cmd[:3] == 'put':
            # put ../filename
            filename = cmd.strip().split(' ')[-1]
            ftp.do_put(filename)
          elif cmd.strip() == 'quit':
            ftp.do_quit()
          else:
            print("请输入正确命令")
      
      
      
      if __name__ == "__main__":
        main()

      五、IO并发

      定义:在内存中数据交换的操作被定义为IO操作,IO------输入输出

      内存和磁盘进行数据交换: 文件的读写 数据库更新

      内存和终端数据交换 :input print sys.stdin sys.stdout sys.stderr

      内存和网络数据的交换: 网络连接 recv send recvfrom

      IO密集型程序 : 程序执行中有大量的IO操作,而较少的cpu运算操作。消耗cpu较少,IO运行时间长

      CPU(计算)密集型程序:程序中存在大量的cpu运算,IO操作相对较少,消耗cpu大。

      5.1、IO分类

      IO分为:阻塞IO、非阻塞IO、IO多路复用、事件驱动IO、异步IO

      阻塞IO

      • 定义: 在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
      • 效率: 阻塞IO是效率很低的一种IO。但是由于逻辑简单所以是默认IO行为。

      阻塞情况:

      • 因为某种执行条件没有满足造成的函数阻塞  e.g. accept input recv
      • 处理IO的时间较长产生的阻塞状态  e.g. 网络传输, 大文件读写

      非阻塞IO

      定义 : 通过修改IO属性行为, 使原本阻塞的IO变为非阻塞的状态。

      设置套接字为非阻塞IO

      • sockfd.setblocking(bool)
      • 功能: 设置套接字为非阻塞IO
      • 参数: 默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞

      超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。

      • sockfd.settimeout(sec)
      • 功能:设置套接字的超时时间
      • 参数:设置的时间

      5.2、IO多路复用

      定义 :通过一个监测,可以同时监控多个IO事件的行为。当哪个IO事件可以执行,即让这个IO事件发生。

      rs, ws, xs = select(rlist, wlist, xlist[, timeout])  监控IO事件,阻塞等待监控的IO时间发生

      参数 :

      • rlist列表,存放(被动)等待处理的IO (接收)
      • wlist列表,存放主动处理的IO(发送)
      • xlist列表,存放出错,希望去处理的IO(异常)
      • timeout 超时检测

      返回值:

      • rs列表rlist中准备就绪的IO
      • ws列表wlist中准备就绪的IO
      • xs列表xlist中准备就绪的IO

      select 实现tcp服务

      1.将关注的IO放入对应的监控类别列表

      2.通过select函数进行监控

      3.遍历select返回值列表,确定就绪IO事件

      4.处理发生的IO事件

      from socket import *
      from select import select
      
      # 创建一个监听套接字作为关注的IO
      s = socket()
      s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
      s.bind(('0.0.0.0',8888))
      s.listen(3)
      
      # 设置关注列表
      rlist = [s]
      wlist = []
      xlist = [s]
      
      # 循环监控IO
      while True:
        rs,ws,xs = select(rlist,wlist,xlist)
        # 遍历三个返回列表,处理IO
        for r in rs:
          # 根据遍历到IO的不同使用if分情况处理
          if r is s:
            c,addr = r.accept()
            print("Connect from",addr)
            rlist.append(c) # 增加新的IO事件
          # else为客户端套接字就绪情况
          else:
            data = r.recv(1024)
            # 客户端退出
            if not data:
              rlist.remove(r) # 从关注列表移除
              r.close()
              continue # 继续处理其他就绪IO
            print("Receive:",data.decode())
            # r.send(b'OK')
            # 我们希望主动处理这个IO对象
            wlist.append(r)
      
        for w in ws:
          w.send(b'OK')
          wlist.remove(w) # 使用后移除
      
        for x in xs:
          pass

      注意:

      • wlist中如果存在IO事件,则select立即返回给ws
      • 处理IO过程中不要出现死循环占有服务端的情况
      • IO多路复用消耗资源较少,效率较高扩展:

      5.3、位运算

      将整数转换为二进制, 按照二进制位进行运算符操作
      & 按位与   | 按位或   ^ 按位异或   << 左移 >> 右移
      11 1011    14 1110
      (11 & 14 1010)   (11| 14 1111)  (11^ 14 0101)
      11 << 2 ===> 44 右侧补0    14 >> 2 ===> 3 挤掉右侧的数字

      使用 :

      • 在做底层硬件时操作寄存器
      • 做标志位的过滤

      5.4、poll方法实现IO多路复用

      创建poll对象:p = select.poll()

      注册关注的IO事件:p.register(fd,event)

      • fd 要关注的IO
      • event 要关注的IO事件类型

      常用类型:

      • POLLIN 读IO事件(rlist)
      • POLLOUT 写IO事件 (wlist)
      • POLLERR 异常IO (xlist)
      • POLLHUP 断开连接

      取消对IO的关注:p.unregister(fd)

      参数: IO对象或者IO对象的fileno

      events = p.poll():

      • 功能:   阻塞等待监控的IO事件发生
      • 返回值: 返回发生的IO事件

      events是一个列表 [(fileno,evnet),(),()....]

      每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型

      poll_server 步骤

      1.创建套接字

      2.将套接字register

      3.创建查找字典,并维护

      4.循环监控IO发生

      5.处理发生的IO

      from socket import *
      from select import *
      
      # 创建套接字
      s = socket()
      s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
      s.bind(('0.0.0.0',8888))
      s.listen(3)
      
      # 创建poll对象关注s
      p = poll()
      
      # 建立查找字典,用于通过fileno查找IO对象
      fdmap = {s.fileno():s}
      
      # 关注s
      p.register(s,POLLIN|POLLERR)
      
      # 循环监控
      while True:
        events = p.poll()
        # 循环遍历发生的事件 fd-->fileno
        for fd,event in events:
          # 区分事件进行处理
          if fd == s.fileno():
            c,addr = fdmap[fd].accept()
            print("Connect from",addr)
            # 添加新的关注IO
            p.register(c,POLLIN|POLLERR)
            fdmap[c.fileno()] = c # 维护字典
          # 按位与判定是POLLIN就绪
          elif event & POLLIN:
            data = fdmap[fd].recv(1024)
            if not data:
              p.unregister(fd) # 取消关注
              fdmap[fd].close()
              del fdmap[fd]  # 从字典中删除
              continue
            print("Receive:",data.decode())
            fdmap[fd].send(b'OK')

      5.5、epoll方法

      1. 使用方法 : 基本与poll相同

      • 生成对象改为 epoll()
      • 将所有事件类型改为EPOLL类型

      2. epoll特点

      • epoll 效率比select poll要高
      • epoll 监控IO数量比select要多
      • epoll 的触发方式比poll要多 (EPOLLET边缘触发)
      from socket import *
      from select import *
      
      # 创建套接字
      s = socket()
      s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
      s.bind(('0.0.0.0',8888))
      s.listen(3)
      
      # 创建epoll对象关注s
      ep = epoll()
      
      # 建立查找字典,用于通过fileno查找IO对象
      fdmap = {s.fileno():s}
      
      # 关注s
      ep.register(s,EPOLLIN|EPOLLERR)
      
      # 循环监控
      while True:
        events = ep.poll()
        # 循环遍历发生的事件 fd-->fileno
        for fd,event in events:
          print("亲,你有IO需要处理哦")
          # 区分事件进行处理
          if fd == s.fileno():
            c,addr = fdmap[fd].accept()
            print("Connect from",addr)
            # 添加新的关注IO
            # 将触发方式变为边缘触发
            ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
            fdmap[c.fileno()] = c # 维护字典
          # 按位与判定是EPOLLIN就绪
          # elif event & EPOLLIN:
          #   data = fdmap[fd].recv(1024)
          #   if not data:
          #     ep.unregister(fd) # 取消关注
          #     fdmap[fd].close()
          #     del fdmap[fd]  # 从字典中删除
          #     continue
          #   print("Receive:",data.decode())
          #   fdmap[fd].send(b'OK')

      以上就是分析python并发网络通信模型的详细内容,更多关于python 并发网络通信模型的资料请关注hwidc其它相关文章!

      【文章出处:美国cn2站群服务器 欢迎转载】