1、先获取告警数据,对告警数据进行处理,主要是将告警数据清洗,去重,聚合等操作,都在后端实现
2、将prometheus的告警规则存到知识库里,这么做主要是生成的查询告警时的指标数据更准确
4、通过查询语句查询prometheus数据,这里也是后端实现,同样需将查询到的数据进行清洗,去重,聚合。防止数据太多,模型报错
1、不管是告警数据还是指标数据都需要注意数据量的问题,数据量太大超过模型上限就直接报错了
2、指标数据查询语句生成问题,这个我试过很多方式,感觉还是将prometheus的告警指标直接存到知识库,通过告警指标生成查询语句这种效果好一点
3、当进行聚合分析时,很多时候告警数据的量不会很大,但是查询到指标数据的量很大,这种我是通过不同step参数去做
import datetime
import requestsfrom django.http import JsonResponsefrom django.views.decorators.http import require_GETimport logging
logger = logging.getLogger(__name__)
@require_GETdef prometheus_query_range(request): """ 转发请求到Prometheus的query_range API,并根据namespace过滤结果 参数: - query: Prometheus查询语句(支持多个,用分号分隔) - start: 开始时间 (RFC3339格式) - end: 结束时间 (RFC3339格式) - step: 时间步长 - namespace: 可选,用于过滤结果中的namespace(支持多个,用逗号分隔) """ try: query = request.GET.get('query') start = request.GET.get('start') end = request.GET.get('end') step = request.GET.get('step', '30m') filter_namespaces = request.GET.get('namespace')
if not all([query, start, end]): return JsonResponse({ 'error': '缺少必要参数: query, start, end都是必需的' }, status=400)
filter_namespace_list = None if filter_namespaces: filter_namespace_list = [ns.strip() for ns in filter_namespaces.split(',') if ns.strip()]
prometheus_url = "http://IP:9090/api/v1/query_range"
queries = [q.strip() for q in query.split(';') if q.strip()]
all_results = []
for q in queries: params = { 'query': q, 'start': start, 'end': end, 'step': step }
response = requests.get(prometheus_url, params=params, timeout=30)
if response.status_code == 200: data = response.json()
if filter_namespace_list is not None and 'data' in data and 'result' in data['data']: filtered_result = [] for item in data['data']['result']: if 'metric' in item and 'namespace' in item['metric']: if item['metric']['namespace'] in filter_namespace_list: filtered_result.append(item)
data['data']['result'] = filtered_result
if 'data' in data and 'result' in data['data']: processed_result = [] for item in data['data']['result']: has_inf = False if 'values' in item: for value_pair in item['values']: if len(value_pair) > 1 and value_pair[1] == "+Inf": has_inf = True break
if has_inf: continue
processed_item = {}
if 'metric' in item: processed_metric = {} if 'pod' in item['metric']: processed_metric['pod'] = item['metric']['pod'] if 'instance' in item['metric']: processed_metric['instance'] = item['metric']['instance'] if 'namespace' in item['metric']: processed_metric['namespace'] = item['metric']['namespace']
processed_item['metric'] = processed_metric
if 'values' in item: formatted_values = [] for value_pair in item['values']: if len(value_pair) >= 2: timestamp = value_pair[0] value = value_pair[1]
formatted_value = { 'time': datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d ' '%H:%M:%S'), 'value': value } formatted_values.append(formatted_value) processed_item['values'] = formatted_values processed_result.append(processed_item)
all_results.extend(processed_result) elif 'data' in data: all_results.extend([]) else: logger.warning(f"Prometheus API调用失败,状态码: {response.status_code},查询: {q}")
final_response = { 'status': 'success', 'data': { 'resultType': 'matrix', 'result': all_results } }
return JsonResponse(final_response)
except requests.exceptions.RequestException as e: logger.error(f"请求Prometheus API时发生网络错误: {str(e)}") return JsonResponse({ 'error': '网络错误', 'message': str(e) }, status=500) except Exception as e: logger.error(f"处理Prometheus查询时发生错误: {str(e)}") return JsonResponse({ 'error': '服务器内部错误', 'message': str(e) }, status=500)