埋点数据是产品决策、用户行为分析和业务监控的重要依据。埋点数据的准确性和稳定性直接影响:
-
业务监控可靠性:核心指标如DAU、转化率等依赖埋点数据
-
产品决策准确性:A/B测试、功能迭代等基于埋点数据分析
-
用户体验保障:关键流程埋点缺失可能导致用户行为无法追踪
埋点数据异常可能原因是:
-
版本发布导致埋点漏上报/多上报
-
业务功能异常导致埋点未正常上报
因此,建立一套埋点巡检和波动告警机制,对保障数据质量、及时发现数据异常至关重要。本方案通过埋点的自动化巡检和实时告警,来保障业务质量的稳定性。
Python脚本实现:
使用request库请求内部数据平台接口获取埋点数据
计算每日埋点波动率
使用FastAPI创建外部接口
dify平台创建工作流&定时执行
通过HTTP调用外部接口
通过chapt 提取接口响应数据
通过钉钉机器人发送巡检结果

1. 获取数据平台的埋点数据(使用requests库发起接口请求)
2. 计算埋点数据波动(例如:计算取前天埋点/昨天埋点的波动率)
需要根据实际业务进行调整
def calculate_fluctuation_rate(self, data: Dict, point_names: List[str]) -> Dict:"""计算埋点波动率并识别异常"""results = {}try:if "data" not in data or not data["data"]:return {}# 按埋点名称和日期组织数据point_data = {}for item in data["data"]:point_name = item.get("point_name")if point_name not in point_names:continueif point_name not in point_data:point_data[point_name] = {}date_str = item.get("date")point_data[point_name][date_str] = {"pv": item.get("pv", 0),"uv": item.get("uv", 0)}# 计算每个埋点的波动率for point_name, dates in point_data.items():if len(dates) < 2:logger.warning(f"埋点 {point_name} 数据不足,无法计算波动率")continue# 按日期排序sorted_dates = sorted(dates.keys())latest_date = sorted_dates[-1]previous_date = sorted_dates[-2]latest_data = dates[latest_date]previous_data = dates[previous_date]# 计算PV和UV的波动率pv_change = self._calculate_change_rate(previous_data["pv"], latest_data["pv"])uv_change = self._calculate_change_rate(previous_data["uv"], latest_data["uv"])# 判断是否异常threshold = float(os.getenv("FLUCTUATION_THRESHOLD", 30.0))is_abnormal = (abs(pv_change) > threshold orabs(uv_change) > threshold orlatest_data["pv"] == 0 or # 零值异常latest_data["uv"] == 0)results[point_name] = {"latest_date": latest_date,"previous_date": previous_date,"pv_fluctuation": round(pv_change, 2),"uv_fluctuation": round(uv_change, 2),"latest_pv": latest_data["pv"],"latest_uv": latest_data["uv"],"previous_pv": previous_data["pv"],"previous_uv": previous_data["uv"],"is_abnormal": is_abnormal,"abnormal_reason": self._get_abnormal_reason(pv_change, uv_change,latest_data["pv"], latest_data["uv"],threshold) if is_abnormal else None}except Exception as e:logger.error(f"计算波动率时发生错误: {str(e)}")return resultsdef _calculate_change_rate(self, previous: int, current: int) -> float:"""计算变化率"""if previous == 0:return 100.0 if current > 0 else 0.0return ((current - previous) / previous) * 100def _get_abnormal_reason(self, pv_change: float, uv_change: float,current_pv: int, current_uv: int, threshold: float) -> str:"""获取异常原因"""reasons = []if current_pv == 0:reasons.append("PV为零")elif abs(pv_change) > threshold:reasons.append(f"PV波动({pv_change:.2f}%)超过阈值({threshold}%)")if current_uv == 0:reasons.append("UV为零")elif abs(uv_change) > threshold:reasons.append(f"UV波动({uv_change:.2f}%)超过阈值({threshold}%)")return "; ".join(reasons)
3. 使用 FastAPI 创建接口和服务 (具体细节可自行百度)
-
static 目录:主要存放静态文件
-
main.py -应用入口
-
包含定义接口路由
-
定义get方法的接口,例如:@app.get("/api/xxx")
@app.get("/api/data/test_api")def test_api(): return {"msg": "success"}
服务启动命令:
gunicorn -c gunicornconf.py api_server.main:app -k uvicorn.workers.UvicornWorker -n qa_tracking
其他关键文件:
-
服务启动文件:start_server.sh (启动服务的命令文件) -
服务配置文件:gunicornconf.py (配置服务信息包括端口、日志配置、内存优化配置等信息)
-
根据需要创建工作流,形成一个应用

-
dify支持接口调用
内部的定时任务可以通过接口调用执行dify应用

-
dify 自带执行日志

钉钉机器人报告


-
关于告警阀值:至于埋点告警值可以看日出工作日的波动情况进行初步评估,后续经过实际告警情况再进行调整 -
dify 内添加节点利用AI根据接口返回数据做出初步分析


