任务APP业务代码开发
当业务代码需要执行很久或者相对耗费资源,此时就不适合在请求上下文中执行该逻辑了。 这时可以考虑使用任务APP的开发模式。 这里在之前任务APP项目初始化的基础上对其进行介绍。
首先需要理解此时的执行过程。界面上点击提交之后,会触发/ms_jk/submit的接口调用,此时在该接口逻辑中并没有执行业务逻辑,
而是提交了一个任务(即submit_task函数的参数)到celery队列。任务会在celery队列中排队,等待执行;而接口直接返回一个特殊状态,
告知前端此时是任务调用,前端会根据该状态轮询右侧配置的业务接口等待程序返回。
同常规APP一样,需要在平台配置该APP。配置与常规APP相同,这里无需修改(具体配置参考常规APP配置示例部分)
接口开发
开发接口 /ms_jk/sayhello,这里假定会有一个很耗时的任务生成一句励志语录到数据库里(这里使用文件代替)。
终端执行 vim app/views/ms_jk/sayhello.py, 编辑内容如下:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import os
from app.views.ms_jk import bp
from flask import request, jsonify
from hippo.web import DataAnalysisTableRecord
from hippo.log_tracking import platform_monitor
@bp.route("/sayhello", methods=['POST'])
@platform_monitor()
def sayhello():
record = DataAnalysisTableRecord(__file__)
name = record.get_input_param("name")
start_symbols = record.get_plot_param("start_symbols",default="虚拟bot")
if start_symbols is not None and not str(start_symbols).endswith(":"):
start_symbols = f"{start_symbols}:"
quote_file = f"{record.output}/quote.txt"
if not os.path.exists(quote_file):
raise Exception("任务执行异常")
with open(quote_file,"r") as fh:
quote = fh.read()
return jsonify(data={'index': [0, 1], 'columns': ['Demo结果展示'], 'data': [[f"{start_symbols} hello,{name},{quote}!"]]}), 200
接口注册与常规APP方式相同,见app/views/ms_jk/__init__.py的编辑过程。
任务开发
首先将任务APP初始化里用于演示的任务函数删掉。使用更有意义的文件名和函数名:
$ rm app/task/a_demo_task.py
$ vim app/task/generate_quote_task.py
编辑内容如下:
# !/usr/bin/env python
# -*- coding:utf-8 -*
from celery import shared_task, current_task
from hippo.web import TaskParam
from hippo.log_tracking import platform_monitor, TASK
import time
import random
QUOTES = [
"天行健,君子以自强不息;地势坤,君子以厚德载物。",
"知者不惑,仁者不忧,勇者不惧。",
"君子藏器于身,待时而动。"
]
@shared_task(ignore_result=False)
@platform_monitor(TASK)
def generate_quote_task(**kwargs) -> dict:
task_param = TaskParam(**kwargs)
try:
# 定义 输出路径
output = task_param.output
inputs_json = dict(task_param.inputs_json) # 前台左侧配置的相关参数
name = inputs_json.get('name') # 任务中可以获取到界面上配置的参数,如果需要的话这里可以使用这些参数完成后续逻辑
print(f"开始为{name}生成励志语录...")
time.sleep(5)
quote = random.choice(QUOTES)
print("生成语录成功,开始入库...")
time.sleep(5)
with open(f"{output}/quote.txt", "w") as fh:
fh.write(quote)
print("入库结束。")
task_param.update_status(task_param.STATUS.succeeded) # 更新任务状态到平台
return {"status": "Task success!"}
except Exception as e:
# update status
task_param.update_status(task_param.STATUS.failed)
return {"status": "Task failed!", "msg": str(e)}
更新submit接口中提交的任务,编辑app/views/ms_jk/submit.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from app.views.ms_jk import bp
from hippo.web import DataAnalysisSubmit
from hippo.log_tracking import platform_monitor
from app.task.generate_quote_task import generate_quote_task # change here
@bp.route("/submit", methods=['POST'])
@platform_monitor
def submit():
das = DataAnalysisSubmit()
return das.submit_task(generate_quote_task) # change here
至此,任务APP开发完毕,启动一下看看效果
# 注意切换环境
$ python wsgi.py
* Serving Flask app 'app' (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: on
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5000
* Running on http://192.168.0.114:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug: * Restarting with stat
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 146-738-623
打开新终端,启动celery队列
# 注意切换环境
$ celery -A wsgi worker -l info
-------------- celery@vm v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-6.8.0-49-generic-x86_64-with-glibc2.35 2025-01-03 14:58:36
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: app:0x7cd06e735ed0
- ** ---------- .> transport: redis://192.168.0.147:6379/7
- ** ---------- .> results: redis://192.168.0.147:6379/7
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app.task.generate_quote_task.generate_quote_task
[2025-01-03 14:58:36,295: INFO/MainProcess] Connected to redis://192.168.0.147:6379/7
[2025-01-03 14:58:36,302: INFO/MainProcess] mingle: searching for neighbors
[2025-01-03 14:58:37,313: INFO/MainProcess] mingle: all alon
出现 . app.task.generate_quote_task.generate_quote_task,表示celery已经识别到任务了,现在去平台该APP应用界面提交一下:
![任务APP界面访问结果展示] ./images/1735888442612.png
同时可以查看任务队列的输出查看执行过程是否符合预期:
![任务APP界面访问结果展示] ./images/1735888514540.png