Skip to content

任务APP项目初始化

任务APP项目的初始化可以在常规APP项目初始化的基础上进行,现对需要修改的地方进行说明:

修改部分说明

首先目录结构和环境都无需修改,仅需改动以下内容:

1. app/__init__.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import os
from flask_cors import CORS
from flask import Flask,  render_template
from hippo.web import AppInit
from hippo.web import exception_handle_register
from hippo.web import blueprints_dynamic_register
from hippo.ext import celery_init_app # change here

__VERSION = (2, 0, 0)
__VERSION__ = ".".join(map(lambda x: str(x), __VERSION))


def create_app():
    app = Flask(__name__)

    AppInit().prepare(app)
    blueprints_dynamic_register(os.path.dirname(__file__), app)

    # r'/*' 是通配符,让本服务器所有的URL 都允许跨域请求
    CORS(app, resources=r'/*')

    celery_init_app(app) # change here

    @app.route('/', methods=['GET'])
    @app.route('/tool_detail', methods=['GET'])
    def index():
        return render_template('index.html')

    exception_handle_register(app)
    return app

增加了celery 初始化, celery 是一个任务调度框架

2. wsgi.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import logging
from app import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"] # change here

if __name__ == '__main__':
    flask_app.run(
        host=flask_app.config.get("HOST"),
        port=flask_app.config.get("PORT"),
        debug=flask_app.config.get("DEBUG")
    )
else:
    # 使用gunicorn启动时, 将flask应用中的日志绑定到 gunicorn 的日志配置中
    gunicorn_logger = logging.getLogger('gunicorn.error')
    flask_app.logger.handlers = gunicorn_logger.handlers
    flask_app.logger.setLevel(gunicorn_logger.level)

增加了 celery_app 句柄供 celery 框架使用

3. app/views/ms_jk/submit.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from app.views.ms_jk import bp
from app.task.a_demo_task import a_demo_task # change here
from hippo.web import DataAnalysisSubmit
from hippo.log_tracking import platform_monitor


@bp.route("/submit", methods=['POST'])
@platform_monitor()
def submit():
    das = DataAnalysisSubmit()
    return das.submit_task(a_demo_task) # change here

这里改变了提交方式,从 .submit() 调用改为 使用 .submit_task(a_demo_task) 调用, submit_task 的参数是一个可调用的函数,该函数的参数在方法内部需要透传给TaskParam类, 以便在任务运行过程中可以根据需要获取相关参数,该部分先介绍到这里,后续会详细介绍说明.

任务函数(示例中为a_demo_task)推荐放到 app/task 目录下, 这里给出一个示例代码:

app/task/a_demo_task.py (该文件需要创建task目录,目录下新建__init__.py和a_demo_task.py)

$ mkdir -p app/task
$ touch app/task/__init__.py
$ touch app/task/a_demo_task.py
$ tree app/task
app/task
├── a_demo_task.py
└── __init__.py

0 directories, 2 files

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from celery import shared_task, current_task
from hippo.web import TaskParam
from hippo.log_tracking import platform_monitor, TASK


@shared_task(ignore_result=False)
@platform_monitor(TASK) # 平台监控装饰器,任务式需要给枚举参数TASK进行标识
def a_demo_task(**kwargs) -> dict:
    task_param = TaskParam(**kwargs)
    try:
        # 定义 输出路径
        output = task_param.output  # 该任务命令执行所在的目录(输出根目录下的output目录)
        log_path = task_param.path_for_dir('log')  # 在输出根目录下的log目录
        inputs_json = dict(task_param.inputs_json)  # 前台左侧配置的相关参数
        input_file = inputs_json.get('input_file')

        # 休眠5s,这里模拟耗时任务,真实场景需填充相关业务代码
        task_param.add(scripts="sleep")
        task_param.add(value='5')
        task_param.run_scripts(history_files=[])

        task_param.update_status(task_param.STATUS.succeeded)  # 更新任务状态到平台
        return {"status": "Task success!"}
    except Exception as e:
        # update status
        task_param.update_status(task_param.STATUS.failed)
        return {"status": "Task failed!", "msg": str(e)}

配置项目参数

在常规APP项目配置文件的基础上, 任务APP还需要进行一些额外的配置,现介绍如下:

REDIS_HOST = "192.168.0.147"
REDIS_PORT = 6379
REDIS_DB = 7
REDIS_EXPIRE = 24 * 60 * 60  # redis 过期时间60秒

# Celery configuration
CELERY_BROKER_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
CELERY_RESULT_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
  • CELERY_BROKER_URL: celery消息代理
  • CELERY_RESULT_BACKEND: celery结果存储后端

启动并初步验证

与常规APP不同的是,任务APP除了要启动服务外还要启动celery队列服务

1. 启动服务

$ python wsgi.py
 * Serving Flask app 'app' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.0.114:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug: * Restarting with stat
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 100-862-322

验证,出现版本信息则应用及环境搭建成功

curl http://192.168.0.114:5000/ms_jk/query_app_version
{
  "app_version": "1.0.0", 
  "web_version": "20241210", 
  "hippo_version": "1.6.15"
}

2. 启动celery队列

新开一个终端窗口,启动celery队列,注意切换到应用所在环境

$ conda activate $PROJECT_NAME

$ celery -A wsgi  worker -l info
 -------------- celery@vm v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-6.8.0-49-generic-x86_64-with-glibc2.35 2024-12-31 15:55:02
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x739741b3ded0
- ** ---------- .> transport:   redis://192.168.0.147:6379/7
- ** ---------- .> results:     redis://192.168.0.147:6379/7
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . app.task.a_demo_task.a_demo_task

[2024-12-31 15:55:02,974: INFO/MainProcess] Connected to redis://192.168.0.147:6379/7
[2024-12-31 15:55:02,985: INFO/MainProcess] mingle: searching for neighbors
[2024-12-31 15:55:04,007: INFO/MainProcess] mingle: all alone
[2024-12-31 15:55:04,061: INFO/MainProcess] celery@vm ready.

当输出中tasks下识别到 app.task.a_demo_task.a_demo_task ,则celery队列启动成功。