任务APP项目初始化
任务APP项目的初始化可以在常规APP项目初始化的基础上进行,现对需要修改的地方进行说明:
修改部分说明
首先目录结构和环境都无需修改,仅需改动以下内容:
1. app/__init__.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
from flask_cors import CORS
from flask import Flask, render_template
from hippo.web import AppInit
from hippo.web import exception_handle_register
from hippo.web import blueprints_dynamic_register
from hippo.ext import celery_init_app # change here
__VERSION = (2, 0, 0)
__VERSION__ = ".".join(map(lambda x: str(x), __VERSION))
def create_app():
app = Flask(__name__)
AppInit().prepare(app)
blueprints_dynamic_register(os.path.dirname(__file__), app)
# r'/*' 是通配符,让本服务器所有的URL 都允许跨域请求
CORS(app, resources=r'/*')
celery_init_app(app) # change here
@app.route('/', methods=['GET'])
@app.route('/tool_detail', methods=['GET'])
def index():
return render_template('index.html')
exception_handle_register(app)
return app
增加了celery 初始化, celery 是一个任务调度框架
2. wsgi.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import logging
from app import create_app
flask_app = create_app()
celery_app = flask_app.extensions["celery"] # change here
if __name__ == '__main__':
flask_app.run(
host=flask_app.config.get("HOST"),
port=flask_app.config.get("PORT"),
debug=flask_app.config.get("DEBUG")
)
else:
# 使用gunicorn启动时, 将flask应用中的日志绑定到 gunicorn 的日志配置中
gunicorn_logger = logging.getLogger('gunicorn.error')
flask_app.logger.handlers = gunicorn_logger.handlers
flask_app.logger.setLevel(gunicorn_logger.level)
增加了 celery_app 句柄供 celery 框架使用
3. app/views/ms_jk/submit.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from app.views.ms_jk import bp
from app.task.a_demo_task import a_demo_task # change here
from hippo.web import DataAnalysisSubmit
from hippo.log_tracking import platform_monitor
@bp.route("/submit", methods=['POST'])
@platform_monitor()
def submit():
das = DataAnalysisSubmit()
return das.submit_task(a_demo_task) # change here
这里改变了提交方式,从 .submit() 调用改为 使用 .submit_task(a_demo_task) 调用,
submit_task 的参数是一个可调用的函数,该函数的参数在方法内部需要透传给TaskParam类,
以便在任务运行过程中可以根据需要获取相关参数,该部分先介绍到这里,后续会详细介绍说明.
任务函数(示例中为a_demo_task)推荐放到 app/task 目录下, 这里给出一个示例代码:
app/task/a_demo_task.py (该文件需要创建task目录,目录下新建__init__.py和a_demo_task.py)
$ mkdir -p app/task
$ touch app/task/__init__.py
$ touch app/task/a_demo_task.py
$ tree app/task
app/task
├── a_demo_task.py
└── __init__.py
0 directories, 2 files
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import shared_task, current_task
from hippo.web import TaskParam
from hippo.log_tracking import platform_monitor, TASK
@shared_task(ignore_result=False)
@platform_monitor(TASK) # 平台监控装饰器,任务式需要给枚举参数TASK进行标识
def a_demo_task(**kwargs) -> dict:
task_param = TaskParam(**kwargs)
try:
# 定义 输出路径
output = task_param.output # 该任务命令执行所在的目录(输出根目录下的output目录)
log_path = task_param.path_for_dir('log') # 在输出根目录下的log目录
inputs_json = dict(task_param.inputs_json) # 前台左侧配置的相关参数
input_file = inputs_json.get('input_file')
# 休眠5s,这里模拟耗时任务,真实场景需填充相关业务代码
task_param.add(scripts="sleep")
task_param.add(value='5')
task_param.run_scripts(history_files=[])
task_param.update_status(task_param.STATUS.succeeded) # 更新任务状态到平台
return {"status": "Task success!"}
except Exception as e:
# update status
task_param.update_status(task_param.STATUS.failed)
return {"status": "Task failed!", "msg": str(e)}
配置项目参数
在常规APP项目配置文件的基础上, 任务APP还需要进行一些额外的配置,现介绍如下:
REDIS_HOST = "192.168.0.147"
REDIS_PORT = 6379
REDIS_DB = 7
REDIS_EXPIRE = 24 * 60 * 60 # redis 过期时间60秒
# Celery configuration
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
- CELERY_BROKER_URL: celery消息代理
- CELERY_RESULT_BACKEND: celery结果存储后端
启动并初步验证
与常规APP不同的是,任务APP除了要启动服务外还要启动celery队列服务
1. 启动服务
$ python wsgi.py
* Serving Flask app 'app' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5000
* Running on http://192.168.0.114:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug: * Restarting with stat
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 100-862-322
验证,出现版本信息则应用及环境搭建成功
curl http://192.168.0.114:5000/ms_jk/query_app_version
{
"app_version": "1.0.0",
"web_version": "20241210",
"hippo_version": "1.6.15"
}
2. 启动celery队列
新开一个终端窗口,启动celery队列,注意切换到应用所在环境
$ conda activate $PROJECT_NAME
$ celery -A wsgi worker -l info
-------------- celery@vm v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-6.8.0-49-generic-x86_64-with-glibc2.35 2024-12-31 15:55:02
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: app:0x739741b3ded0
- ** ---------- .> transport: redis://192.168.0.147:6379/7
- ** ---------- .> results: redis://192.168.0.147:6379/7
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app.task.a_demo_task.a_demo_task
[2024-12-31 15:55:02,974: INFO/MainProcess] Connected to redis://192.168.0.147:6379/7
[2024-12-31 15:55:02,985: INFO/MainProcess] mingle: searching for neighbors
[2024-12-31 15:55:04,007: INFO/MainProcess] mingle: all alone
[2024-12-31 15:55:04,061: INFO/MainProcess] celery@vm ready.
当输出中tasks下识别到 app.task.a_demo_task.a_demo_task ,则celery队列启动成功。