Skip to content

任务APP业务代码开发

当业务代码需要执行很久或者相对耗费资源,此时就不适合在请求上下文中执行该逻辑了。 这时可以考虑使用任务APP的开发模式。 这里在之前任务APP项目初始化的基础上对其进行介绍。

首先需要理解此时的执行过程。界面上点击提交之后,会触发/ms_jk/submit的接口调用,此时在该接口逻辑中并没有执行业务逻辑, 而是提交了一个任务(即submit_task函数的参数)到celery队列。任务会在celery队列中排队,等待执行;而接口直接返回一个特殊状态, 告知前端此时是任务调用,前端会根据该状态轮询右侧配置的业务接口等待程序返回。

同常规APP一样,需要在平台配置该APP。配置与常规APP相同,这里无需修改(具体配置参考常规APP配置示例部分)

接口开发

开发接口 /ms_jk/sayhello,这里假定会有一个很耗时的任务生成一句励志语录到数据库里(这里使用文件代替)。 终端执行 vim app/views/ms_jk/sayhello.py, 编辑内容如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
from app.views.ms_jk import bp
from flask import request, jsonify
from hippo.web import DataAnalysisTableRecord
from hippo.log_tracking import platform_monitor

@bp.route("/sayhello", methods=['POST'])
@platform_monitor()
def sayhello():
    record = DataAnalysisTableRecord(__file__)

    name = record.get_input_param("name")
    start_symbols = record.get_plot_param("start_symbols",default="虚拟bot")
    if start_symbols is not None and not str(start_symbols).endswith(":"):
        start_symbols = f"{start_symbols}:"

    quote_file = f"{record.output}/quote.txt"
    if not os.path.exists(quote_file):
      raise Exception("任务执行异常")
    with open(quote_file,"r") as fh:
      quote = fh.read()

    return jsonify(data={'index': [0, 1], 'columns': ['Demo结果展示'], 'data': [[f"{start_symbols} hello,{name},{quote}!"]]}), 200


接口注册与常规APP方式相同,见app/views/ms_jk/__init__.py的编辑过程。

任务开发

首先将任务APP初始化里用于演示的任务函数删掉。使用更有意义的文件名和函数名:

$ rm app/task/a_demo_task.py
$ vim app/task/generate_quote_task.py

编辑内容如下:

# !/usr/bin/env python
# -*- coding:utf-8 -*
from celery import shared_task, current_task
from hippo.web import TaskParam
from hippo.log_tracking import platform_monitor, TASK
import time
import random

QUOTES = [
    "天行健,君子以自强不息;地势坤,君子以厚德载物。",
    "知者不惑,仁者不忧,勇者不惧。",
    "君子藏器于身,待时而动。"
]


@shared_task(ignore_result=False)
@platform_monitor(TASK)
def generate_quote_task(**kwargs) -> dict:
    task_param = TaskParam(**kwargs)
    try:
        # 定义 输出路径
        output = task_param.output
        inputs_json = dict(task_param.inputs_json)  # 前台左侧配置的相关参数
        name = inputs_json.get('name')  # 任务中可以获取到界面上配置的参数,如果需要的话这里可以使用这些参数完成后续逻辑

        print(f"开始为{name}生成励志语录...")
        time.sleep(5)
        quote = random.choice(QUOTES)
        print("生成语录成功,开始入库...")
        time.sleep(5)
        with open(f"{output}/quote.txt", "w") as fh:
            fh.write(quote)
        print("入库结束。")

        task_param.update_status(task_param.STATUS.succeeded)  # 更新任务状态到平台
        return {"status": "Task success!"}
    except Exception as e:
        # update status
        task_param.update_status(task_param.STATUS.failed)
        return {"status": "Task failed!", "msg": str(e)}

更新submit接口中提交的任务,编辑app/views/ms_jk/submit.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from app.views.ms_jk import bp
from hippo.web import DataAnalysisSubmit
from hippo.log_tracking import platform_monitor
from app.task.generate_quote_task import generate_quote_task # change here


@bp.route("/submit", methods=['POST'])
@platform_monitor
def submit():
    das = DataAnalysisSubmit()
    return das.submit_task(generate_quote_task) # change here

至此,任务APP开发完毕,启动一下看看效果

# 注意切换环境
$ python wsgi.py
 * Serving Flask app 'app' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://192.168.0.114:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug: * Restarting with stat
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 146-738-623

打开新终端,启动celery队列

# 注意切换环境
$ celery -A wsgi  worker -l info

 -------------- celery@vm v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- Linux-6.8.0-49-generic-x86_64-with-glibc2.35 2025-01-03 14:58:36
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x7cd06e735ed0
- ** ---------- .> transport:   redis://192.168.0.147:6379/7
- ** ---------- .> results:     redis://192.168.0.147:6379/7
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . app.task.generate_quote_task.generate_quote_task

[2025-01-03 14:58:36,295: INFO/MainProcess] Connected to redis://192.168.0.147:6379/7
[2025-01-03 14:58:36,302: INFO/MainProcess] mingle: searching for neighbors
[2025-01-03 14:58:37,313: INFO/MainProcess] mingle: all alon

出现 . app.task.generate_quote_task.generate_quote_task,表示celery已经识别到任务了,现在去平台该APP应用界面提交一下:

![任务APP界面访问结果展示] ./images/1735888442612.png

同时可以查看任务队列的输出查看执行过程是否符合预期:

![任务APP界面访问结果展示] ./images/1735888514540.png