使用SokectServer包做二次开发,顺带把日志类封装加了个信号处理方法也写一下吧
main.py
# coding=utf-8
import time
from lib import logger as logger_class, udpserver
from multiprocessing import Queue
# 多进程数据接收队列
data = Queue()
logger = logger_class.Logger(data).get_logger
if __name__ == '__main__':
logger.debug('启动了4D上位机')
# 多进程UDP服务对象
udp_server = udpserver.ProcessListener(12345, data)
udp_server.start()
# 需要延迟一秒判断
time.sleep(1)
if udp_server.get_state() is False:
print("UDP启动失败")
while True:
if not data.empty():
info = data.get()
else:
time.sleep(0.01)
# 强制终止进程, 做清理时调用.
# udp_server.stop_server()
logger.py
"""
日志封装类,导入即可直接使用
# 当前文件名 logger.py
"""
from functools import wraps
import os
import datetime
import loguru
# 单例类的装饰器
def singleton_class_decorator(cls):
"""
装饰器,单例类的装饰器
"""
# 在装饰器里定义一个字典,用来存放类的实例。
_instance = {}
# 装饰器,被装饰的类
@wraps(cls)
def wrapper_class(*args, **kwargs):
# 判断,类实例不在类实例的字典里,就重新创建类实例
if cls not in _instance:
# 将新创建的类实例,存入到实例字典中
_instance[cls] = cls(*args, **kwargs)
# 如果实例字典中,存在类实例,直接取出返回类实例
return _instance[cls]
# 返回,装饰器中,被装饰的类函数
return wrapper_class
@singleton_class_decorator
class Logger:
def __init__(self, data=None):
self.data = data
self.logger_add()
@staticmethod
def get_project_path(project_path=None):
if project_path is None:
# 当前项目文件的,绝对真实路径
# 路径,一个点代表当前目录,两个点代表当前目录的上级目录
project_path = os.path.realpath('.')
# 返回当前项目路径
return project_path
def get_log_path(self):
# 项目目录
project_path = self.get_project_path()
# 项目日志目录
project_log_dir = os.path.join(project_path, 'logs')
# 日志文件名
project_log_filename = 'runtime_{}.log'.format(datetime.date.today())
# 日志文件路径
project_log_path = os.path.join(project_log_dir, project_log_filename)
# 返回日志路径
return project_log_path
def logger_add(self):
loguru.logger.add(
# 水槽,分流器,可以用来输入路径
sink=self.get_log_path(),
# 日志创建周期,这里也可以配置成文件大小,例如10 MB
rotation='00:00',
# 保存
retention='1 year',
# 文件的压缩格式
compression='zip',
# 编码格式
encoding="utf-8",
# 具有使日志记录调用非阻塞的优点
enqueue=True,
# 配置日志等级,配置这里是只保存有用信息,debug信息剔除
level='INFO'
)
@property
def get_logger(self):
return loguru.logger.patch(self.msg_callback)
def msg_callback(self, record):
if self.data is not None:
msg_time = '{:%Y-%m-%d %H:%M:%S}'.format(record['time'])
msg = '%s[%s]:%s' % (msg_time, record['level'].name, record['message'])
info = {"type": "LOG", "data": msg}
self.data.put(info)
if __name__ == '__main__':
'''
# 实例化日志类
'''
"""
在其他.py文件中,只需要直接导入已经实例化的logger类即可
例如导入访视如下:
from project.logger import logger
logger = Logger().get_logger
logger.debug('调试代码')
logger.info('输出信息')
logger.success('输出成功')
logger.warning('错误警告')
logger.error('代码错误')
logger.critical('崩溃输出')
然后直接使用logger即可
"""
logger.info('----原始测试----')
udpserver.py
# coding=utf-8
import socketserver
import time
import multiprocessing
from lib import logger as logger_class
logger = logger_class.Logger().get_logger
class ProcessListener(multiprocessing.Process):
def __init__(self, port=8000, data=None):
super().__init__()
self.server = None
self.port = port
self.data = data
def run(self):
# noinspection PyBroadException
try:
self.start_server()
except Exception as e:
logger.warning("UDP启动出错%s" % e)
def start_server(self):
self.server = socketserver.ThreadingUDPServer(("0.0.0.0", self.port), UdpServer)
self.server.serve_forever()
def get_state(self):
time.sleep(1)
return self.is_alive()
def stop_server(self):
if self.get_state():
self.terminate()
class UdpServer(socketserver.BaseRequestHandler):
def handle(self):
# noinspection PyBroadException
try:
raw_data = self.request[0]
receive_ip, receive_port = self.client_address
# analysis.montage(receive_ip, receive_port, raw_data)
except Exception as e:
logger.warning("UDP接收出错%s" % e)
版权说明
本文地址:http://www.liuyangdeboke.cn/?post=40
未标注转载均为本站远程,转载请注明文章出处:
发表评论