前面的博客有讲到dify工作流调接口查询数据以及调用Dify的聊天助手API,此博客需具备以上基础。
背景:本人就职于某IT企业,当前公司正推进企业级AI应用的建设。基于微服务架构的业务复杂度较高,在技术选型中我们采用Dify工作流来实现核心需求。该方案的实现流程与此博客大体一致,可满足企业级应用的技术要求。
搭建步骤已省略,核心内容在下方。Dify的工作流具备很多组件,大家可以根据需求灵活使用,此次演示为Demo,真实场景比这个要复杂哈。
由于在工作流中使用http请求保存数据,需要使用postman验证该接口是否可用。
在发布前,需要运行或调试工作流是否正常。遇到问题不要慌,解决问题就是,大家可以加入技术群或个人微信交流~
有些人就会问,Dify不是都实现了吗,为啥还要编写java代码去对接了,前端不是能对接python接口吗?
自己也不确定,特意借助DeepSeek查了下,支持国产!
@RestController@RequestMapping("/workflow")public class WorkFlowController { @Autowired private WorkFlowService workFlowService;
* 上传文件 * * @param file * @return * @throws IOException */ @PostMapping("/upload") public WorkFlowFileVo upload(@RequestParam("file") MultipartFile file) throws IOException { return workFlowService.upload(file); }
* 执行工作流 * * @param workFlowRunDto * @return */ @PostMapping("/runWorkFlow") public SseEmitter runWorkFlow(@RequestBody WorkFlowRunDto workFlowRunDto) { return workFlowService.runWorkFlow(workFlowRunDto); }
* 获取工作流执行情况 * * @param workflowRunId * @return */ @GetMapping("/workFlowInfo") public WorkFlowExeVo workFlowRunInfo(String workflowRunId) { return workFlowService.workFlowRunInfo(workflowRunId); }
}
public interface WorkFlowService { * 上传文件 * * @param file * @return */ public WorkFlowFileVo upload(@RequestParam("file") MultipartFile file) throws IOException;
* 执行工作流 * * @param workFlowRunDto * @return */ public SseEmitter runWorkFlow(@RequestBody WorkFlowRunDto workFlowRunDto);
* 获取工作流执行情况 * * @param workflowRunId * @return */ public WorkFlowExeVo workFlowRunInfo(String workflowRunId);
}
* 上传文件 * * @param file * @return * @throws IOException */ @Override public WorkFlowFileVo upload(MultipartFile file) throws IOException { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.MULTIPART_FORM_DATA); headers.set("Authorization", difyConfig.getSaveDataAuthorization()); MultiValueMap<String, Object> body = new LinkedMultiValueMap<>(); body.add("file", new ByteArrayResource(file.getBytes()) { @Override public String getFilename() { return file.getOriginalFilename(); } });
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers); String uploadUrl = difyConfig.getSaveDataUrl() + "/files/upload";
ResponseEntity<String> response = restTemplate.exchange(uploadUrl, HttpMethod.POST, requestEntity, String.class); log.info("上传文件的response: {}", response);
WorkFlowFileVo workFlowFileVo = JSON.parseObject(response.getBody(), WorkFlowFileVo.class); WorkFlowRunDto workFlowRunDto = buildWorkFlowRunDto(workFlowFileVo.getId()); this.runWorkFlow(workFlowRunDto); }
* 执行工作流 * * @param workFlowRunDto * @return */ @Override public SseEmitter runWorkFlow(WorkFlowRunDto workFlowRunDto) { SseEmitter emitter = new SseEmitter(300_000L);
ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(() -> { try { String runUrl = difyConfig.getSaveDataUrl() + "/workflows/run"; log.info("runUrl: {}", runUrl);
HttpHeaders headers = new HttpHeaders(); headers.set("Authorization", difyConfig.getSaveDataAuthorization()); headers.setContentType(MediaType.APPLICATION_JSON); headers.set(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE); HttpEntity<WorkFlowRunDto> requestEntity = new HttpEntity<>(workFlowRunDto, headers);
restTemplate.execute( runUrl, HttpMethod.POST, request -> { request.getHeaders().setContentType(MediaType.APPLICATION_JSON); request.getHeaders().addAll(requestEntity.getHeaders());
if (requestEntity.getBody() != null) { new ObjectMapper().writeValue(request.getBody(), requestEntity.getBody()); } }, response -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getBody()))) { boolean workflowRunIdProcessed = false; String line; while ((line = reader.readLine()) != null) { if (line.startsWith("event: ping")) { continue; } emitter.send(line); log.info("line: {}", line); if (!workflowRunIdProcessed) { try { workflowRunIdProcessed = processLine(line); } catch (Exception e) { log.error("记录id出现异常: {}", e.getMessage()); emitter.completeWithError(e); } } } } emitter.complete(); return null; } );
} catch (Exception e) { log.error("处理过程中发生错误: {}", e.getMessage()); emitter.completeWithError(e); } finally { log.info("流式输出结束..."); } }); executor.shutdown(); log.info("流式输出完成..."); return emitter; }
@Override public WorkFlowExeVo workFlowRunInfo(String workflowRunId) { log.info("获取到的工作流id: {}", workflowRunId);
HttpHeaders headers = new HttpHeaders(); headers.set("Authorization", difyConfig.getWorkFlowAuthorization());
String workFlowInfoUrl = difyConfig.getWorkFlowUrl() + "/workflows/run/" + workflowRunId;
HttpEntity<String> requestEntity = new HttpEntity<>(headers);
ResponseEntity<String> response = restTemplate.exchange(workFlowInfoUrl, HttpMethod.GET, requestEntity, String.class); log.info("response: {}", response);
return JSON.parseObject(response.getBody(), WorkFlowExeVo.class); }
有小伙伴就纳闷了,公司不是有测试吗,怎么还要自己测啊?
测试文件上传,由于Demo版代码执行文件与工作流执行在一块,如果基于业务需求,中间会进行其他操作。
建议大家在开发过程中规范记录日志,以便在系统出现异常时能快速定位问题根源。日志的详细程度可根据实际需求和开发经验灵活调整。
基于上述流程方案,我们已顺利完成了企业级应用需求的开发。