前面的博客有讲到dify工作流调接口查询数据以及调用Dify的聊天助手API,此博客需具备以上基础。
背景:本人就职于某IT企业,当前公司正推进企业级AI应用的建设。基于微服务架构的业务复杂度较高,在技术选型中我们采用Dify工作流来实现核心需求。该方案的实现流程与此博客大体一致,可满足企业级应用的技术要求。
搭建步骤已省略,核心内容在下方。Dify的工作流具备很多组件,大家可以根据需求灵活使用,此次演示为Demo,真实场景比这个要复杂哈。
由于在工作流中使用http请求保存数据,需要使用postman验证该接口是否可用。
在发布前,需要运行或调试工作流是否正常。遇到问题不要慌,解决问题就是,大家可以加入技术群或个人微信交流~
有些人就会问,Dify不是都实现了吗,为啥还要编写java代码去对接了,前端不是能对接python接口吗?
自己也不确定,特意借助DeepSeek查了下,支持国产!
@RestController
@RequestMapping("/workflow")
public class WorkFlowController {
@Autowired
private WorkFlowService workFlowService;
* 上传文件
*
* @param file
* @return
* @throws IOException
*/
@PostMapping("/upload")
public WorkFlowFileVo upload(@RequestParam("file") MultipartFile file) throws IOException {
return workFlowService.upload(file);
}
* 执行工作流
*
* @param workFlowRunDto
* @return
*/
@PostMapping("/runWorkFlow")
public SseEmitter runWorkFlow(@RequestBody WorkFlowRunDto workFlowRunDto) {
return workFlowService.runWorkFlow(workFlowRunDto);
}
* 获取工作流执行情况
*
* @param workflowRunId
* @return
*/
@GetMapping("/workFlowInfo")
public WorkFlowExeVo workFlowRunInfo(String workflowRunId) {
return workFlowService.workFlowRunInfo(workflowRunId);
}
}
public interface WorkFlowService {
* 上传文件
*
* @param file
* @return
*/
public WorkFlowFileVo upload(@RequestParam("file") MultipartFile file) throws IOException;
* 执行工作流
*
* @param workFlowRunDto
* @return
*/
public SseEmitter runWorkFlow(@RequestBody WorkFlowRunDto workFlowRunDto);
* 获取工作流执行情况
*
* @param workflowRunId
* @return
*/
public WorkFlowExeVo workFlowRunInfo(String workflowRunId);
}
* 上传文件
*
* @param file
* @return
* @throws IOException
*/
@Override
public WorkFlowFileVo upload(MultipartFile file) throws IOException {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
headers.set("Authorization", difyConfig.getSaveDataAuthorization());
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("file", new ByteArrayResource(file.getBytes()) {
@Override
public String getFilename() {
return file.getOriginalFilename();
}
});
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
String uploadUrl = difyConfig.getSaveDataUrl() + "/files/upload";
ResponseEntity<String> response = restTemplate.exchange(uploadUrl, HttpMethod.POST, requestEntity, String.class);
log.info("上传文件的response: {}", response);
WorkFlowFileVo workFlowFileVo = JSON.parseObject(response.getBody(), WorkFlowFileVo.class);
WorkFlowRunDto workFlowRunDto = buildWorkFlowRunDto(workFlowFileVo.getId());
this.runWorkFlow(workFlowRunDto);
}
* 执行工作流
*
* @param workFlowRunDto
* @return
*/
@Override
public SseEmitter runWorkFlow(WorkFlowRunDto workFlowRunDto) {
SseEmitter emitter = new SseEmitter(300_000L);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
try {
String runUrl = difyConfig.getSaveDataUrl() + "/workflows/run";
log.info("runUrl: {}", runUrl);
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", difyConfig.getSaveDataAuthorization());
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE);
HttpEntity<WorkFlowRunDto> requestEntity = new HttpEntity<>(workFlowRunDto, headers);
restTemplate.execute(
runUrl,
HttpMethod.POST,
request -> {
request.getHeaders().setContentType(MediaType.APPLICATION_JSON);
request.getHeaders().addAll(requestEntity.getHeaders());
if (requestEntity.getBody() != null) {
new ObjectMapper().writeValue(request.getBody(), requestEntity.getBody());
}
},
response -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getBody()))) {
boolean workflowRunIdProcessed = false;
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("event: ping")) {
continue;
}
emitter.send(line);
log.info("line: {}", line);
if (!workflowRunIdProcessed) {
try {
workflowRunIdProcessed = processLine(line);
} catch (Exception e) {
log.error("记录id出现异常: {}", e.getMessage());
emitter.completeWithError(e);
}
}
}
}
emitter.complete();
return null;
}
);
} catch (Exception e) {
log.error("处理过程中发生错误: {}", e.getMessage());
emitter.completeWithError(e);
} finally {
log.info("流式输出结束...");
}
});
executor.shutdown();
log.info("流式输出完成...");
return emitter;
}
@Override
public WorkFlowExeVo workFlowRunInfo(String workflowRunId) {
log.info("获取到的工作流id: {}", workflowRunId);
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", difyConfig.getWorkFlowAuthorization());
String workFlowInfoUrl = difyConfig.getWorkFlowUrl() + "/workflows/run/" + workflowRunId;
HttpEntity<String> requestEntity = new HttpEntity<>(headers);
ResponseEntity<String> response = restTemplate.exchange(workFlowInfoUrl, HttpMethod.GET, requestEntity, String.class);
log.info("response: {}", response);
return JSON.parseObject(response.getBody(), WorkFlowExeVo.class);
}
有小伙伴就纳闷了,公司不是有测试吗,怎么还要自己测啊?
测试文件上传,由于Demo版代码执行文件与工作流执行在一块,如果基于业务需求,中间会进行其他操作。
建议大家在开发过程中规范记录日志,以便在系统出现异常时能快速定位问题根源。日志的详细程度可根据实际需求和开发经验灵活调整。
基于上述流程方案,我们已顺利完成了企业级应用需求的开发。