1. 当前位置:网站首页 > Python

基于多进程非堵塞UdpServer类


使用SokectServer包做二次开发,顺带把日志类封装加了个信号处理方法也写一下吧

main.py

# coding=utf-8
import time
from lib import logger as logger_class, udpserver
from multiprocessing import Queue
# 多进程数据接收队列
data = Queue()
logger = logger_class.Logger(data).get_logger

if __name__ == '__main__':
    logger.debug('启动了4D上位机')
    # 多进程UDP服务对象
    udp_server = udpserver.ProcessListener(12345, data)
    udp_server.start()
    # 需要延迟一秒判断
    time.sleep(1)
    if udp_server.get_state() is False:
        print("UDP启动失败")

    while True:
        if not data.empty():
            info = data.get()
        else:
            time.sleep(0.01)
        # 强制终止进程, 做清理时调用.
        # udp_server.stop_server()

logger.py

"""
 日志封装类,导入即可直接使用
# 当前文件名 logger.py
"""

from functools import wraps
import os
import datetime
import loguru

# 单例类的装饰器
def singleton_class_decorator(cls):
    """
    装饰器,单例类的装饰器
    """
    # 在装饰器里定义一个字典,用来存放类的实例。
    _instance = {}

    # 装饰器,被装饰的类
    @wraps(cls)
    def wrapper_class(*args, **kwargs):
        # 判断,类实例不在类实例的字典里,就重新创建类实例
        if cls not in _instance:
            # 将新创建的类实例,存入到实例字典中
            _instance[cls] = cls(*args, **kwargs)
        # 如果实例字典中,存在类实例,直接取出返回类实例
        return _instance[cls]

    # 返回,装饰器中,被装饰的类函数
    return wrapper_class

@singleton_class_decorator
class Logger:
    def __init__(self, data=None):
        self.data = data
        self.logger_add()

    @staticmethod
    def get_project_path(project_path=None):
        if project_path is None:
            # 当前项目文件的,绝对真实路径
            # 路径,一个点代表当前目录,两个点代表当前目录的上级目录
            project_path = os.path.realpath('.')
        # 返回当前项目路径
        return project_path

    def get_log_path(self):
        # 项目目录
        project_path = self.get_project_path()
        # 项目日志目录
        project_log_dir = os.path.join(project_path, 'logs')
        # 日志文件名
        project_log_filename = 'runtime_{}.log'.format(datetime.date.today())
        # 日志文件路径
        project_log_path = os.path.join(project_log_dir, project_log_filename)
        # 返回日志路径
        return project_log_path

    def logger_add(self):
        loguru.logger.add(
            # 水槽,分流器,可以用来输入路径
            sink=self.get_log_path(),
            # 日志创建周期,这里也可以配置成文件大小,例如10 MB
            rotation='00:00',
            # 保存
            retention='1 year',
            # 文件的压缩格式
            compression='zip',
            # 编码格式
            encoding="utf-8",
            # 具有使日志记录调用非阻塞的优点
            enqueue=True,
            # 配置日志等级,配置这里是只保存有用信息,debug信息剔除
            level='INFO'
        )

    @property
    def get_logger(self):
        return loguru.logger.patch(self.msg_callback)

    def msg_callback(self, record):
        if self.data is not None:
            msg_time = '{:%Y-%m-%d %H:%M:%S}'.format(record['time'])
            msg = '%s[%s]:%s' % (msg_time, record['level'].name, record['message'])
            info = {"type": "LOG", "data": msg}
            self.data.put(info)

if __name__ == '__main__':
    '''
    # 实例化日志类
    '''
    """
    在其他.py文件中,只需要直接导入已经实例化的logger类即可
    例如导入访视如下:
    from project.logger import logger
    logger = Logger().get_logger
    logger.debug('调试代码')
    logger.info('输出信息')
    logger.success('输出成功')
    logger.warning('错误警告')
    logger.error('代码错误')
    logger.critical('崩溃输出')
    然后直接使用logger即可
    """
    logger.info('----原始测试----')

udpserver.py

# coding=utf-8
import socketserver
import time
import multiprocessing
from lib import logger as logger_class
logger = logger_class.Logger().get_logger

class ProcessListener(multiprocessing.Process):

    def __init__(self, port=8000, data=None):
        super().__init__()
        self.server = None
        self.port = port
        self.data = data

    def run(self):
        # noinspection PyBroadException
        try:
            self.start_server()
        except Exception as e:
            logger.warning("UDP启动出错%s" % e)

    def start_server(self):
        self.server = socketserver.ThreadingUDPServer(("0.0.0.0", self.port), UdpServer)
        self.server.serve_forever()

    def get_state(self):
        time.sleep(1)
        return self.is_alive()

    def stop_server(self):
        if self.get_state():
            self.terminate()

class UdpServer(socketserver.BaseRequestHandler):

    def handle(self):
        # noinspection PyBroadException
        try:
            raw_data = self.request[0]
            receive_ip, receive_port = self.client_address
            # analysis.montage(receive_ip, receive_port, raw_data)
        except Exception as e:
            logger.warning("UDP接收出错%s" % e)

本文最后更新于2023-3-17,已超过 3个月没有更新,如果文章内容或图片资源失效,请留言反馈,我们会及时处理,谢谢!
版权说明

本文地址:http://www.liuyangdeboke.cn/?post=40
未标注转载均为本站远程,转载请注明文章出处:

发表评论

联系我们

在线咨询:点击这里给我发消息

微信号:17721538135

工作日:9:00-23:00,节假日休息

扫码关注