1. 当前位置:网站首页 > Python

一个简单的Flask的接口案例


post请求 校验 入库

# coding=utf-8
import json
import logging
import config
import redis
from flask import Flask, request
from flask_cors import CORS
redis_ip = "127.0.0.1"
app = Flask(__name__)
CORS(app, supports_credentials=True)

# 配置修改接口
@app.route("/set_config", methods=["POST"])
def set_config():
    return_dict = {'return_code': '200', 'return_info': '修改成功', 'result': {}}
    # noinspection PyBroadException
    try:
        if request.form['fields']:
            r = redis.StrictRedis(host=redis_ip, port=6379, db=0)
            r.set("config", request.form['fields'])
        pass
    except Exception as e:
        return_dict["return_code"] = 500
        return_dict["return_info"] = "修改失败"
        logging.warning("修改接口调用失败 %s" % e)
    return json.dumps(return_dict, ensure_ascii=False)

# 配置修改接口
@app.route("/set_config_json", methods=["POST"])
def set_config_json():
    return_dict = {'return_code': '200',
                   'return_info': '修改成功', 'result': {}}
    # noinspection PyBroadException
    try:
        data = request.get_json()
        if data is not None and data != "":
            r = redis.StrictRedis(host=redis_ip, port=6379, db=0)
            r.set("config", json.dumps(data))
        else:
            return_dict["return_code"] = 500
            return_dict["return_info"] = "数据不能为空"
    except Exception as e:
        return_dict["return_code"] = 500
        return_dict["return_info"] = "修改失败"
        logging.warning("修改接口调用失败 %s" % e)
    return json.dumps(return_dict, ensure_ascii=False)

# 全局配置接口
@app.route("/base_config", methods=["POST"])
def base_config():
    return_dict = {'return_code': '200', 'return_info': '配置成功', 'result': {}}
    # noinspection PyBroadException
    try:
        if request.form['projectCode'] and request.form['deviceCode'] and request.form['KafkaAddress'] \
                and request.form['KafkaPort'] and request.form['TopicName'] and request.form['repeatTime']:
            topic_name = str(request.form['TopicName'])
            data = {
                "projectCode": request.form['projectCode'],
                "deviceCode": request.form['deviceCode'],
                "repeatTime": int(request.form['repeatTime'])
            }
            ip_list = request.form['KafkaAddress'].split(".")
            if len(ip_list) == 4:
                for num in ip_list:
                    if num.isdigit() and 0 <= int(num) <= 255 and 0 < int(request.form['KafkaPort']) < 999999:
                        data["KafkaAddress"] = request.form['KafkaAddress']
                        data["KafkaPort"] = request.form['KafkaPort']
                        data["TopicName"] = topic_name
                        r = redis.StrictRedis(host=redis_ip, port=6379, db=0)
                        r.set("base_config", json.dumps(data))
                    else:
                        return_dict["return_code"] = 502
                        return_dict["return_info"] = "设备ip或端口不正确"
            else:
                return_dict["return_code"] = 501
                return_dict["return_info"] = "设备ip不正确"

        else:
            return_dict["return_code"] = 503
            return_dict["return_info"] = "字段不满足或有空"
    except Exception as e:
        return_dict["return_code"] = 500
        return_dict["return_info"] = "配置失败"
        logging.warning("修改接口调用失败 %s" % e)
    return json.dumps(return_dict, ensure_ascii=False)

if __name__ == "__main__":
    # 启动项目
    app.debug = True
    app.run(host='0.0.0.0', port=8001)

本文最后更新于2022-1-12,已超过 3个月没有更新,如果文章内容或图片资源失效,请留言反馈,我们会及时处理,谢谢!
版权说明

本文地址:http://www.liuyangdeboke.cn/?post=10
未标注转载均为本站远程,转载请注明文章出处:

发表评论

联系我们

在线咨询:点击这里给我发消息

微信号:17721538135

工作日:9:00-23:00,节假日休息

扫码关注