提交 7761eee3 编写于 作者: M mxd

rest服务增加异步接口

上级 2a4c1ce2
......@@ -42,6 +42,10 @@ public class SpiderFlowThreadPoolExecutor {
});
}
public Future<?> submit(Runnable runnable){
return this.executor.submit(runnable);
}
/**
* 创建子线程池
......
......@@ -54,7 +54,7 @@ public class Spider {
@Autowired
private FlowNoticeService flowNoticeService;
private static SpiderFlowThreadPoolExecutor threadPoolExecutor;
public static SpiderFlowThreadPoolExecutor executorInstance;
private static final String ATOMIC_DEAD_CYCLE = "__atomic_dead_cycle";
......@@ -62,7 +62,7 @@ public class Spider {
@PostConstruct
private void init() {
threadPoolExecutor = new SpiderFlowThreadPoolExecutor(totalThreads);
executorInstance = new SpiderFlowThreadPoolExecutor(totalThreads);
}
public List<SpiderOutput> run(SpiderFlow spiderFlow, SpiderContext context, Map<String, Object> variables) {
......@@ -120,7 +120,7 @@ public class Spider {
submitStrategy = new RandomThreadSubmitStrategy();
}
//创建子线程池,采用一父多子的线程池,子线程数不能超过总线程数(超过时进入队列等待),+1是因为会占用一个线程用来调度执行下一级
SubThreadPoolExecutor pool = threadPoolExecutor.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1,submitStrategy);
SubThreadPoolExecutor pool = executorInstance.createSubThreadPoolExecutor(Math.max(nThreads,1) + 1,submitStrategy);
context.setRootNode(root);
context.setThreadPool(pool);
//触发监听器
......
......@@ -21,7 +21,6 @@ public class Tokenizer {
stream.startSpan();
RuntimeException re = null;
// TODO: this will fall on its face if we have something like {{ "}}" }}.
while (stream.hasMore()) {
if (stream.match("${", false)) {
if (!stream.isSpanEmpty()) tokens.add(new Token(TokenType.TextBlock, stream.endSpan()));
......
......@@ -55,7 +55,9 @@ public class SpiderJob extends QuartzJobBean {
}
JobDataMap dataMap = context.getMergedJobDataMap();
SpiderFlow spiderFlow = (SpiderFlow) dataMap.get(SpiderJobManager.JOB_PARAM_NAME);
run(spiderFlow, context.getNextFireTime());
if("1".equalsIgnoreCase(spiderFlow.getEnabled())){
run(spiderFlow, context.getNextFireTime());
}
}
public void run(String id) {
......@@ -63,13 +65,17 @@ public class SpiderJob extends QuartzJobBean {
}
public void run(SpiderFlow spiderFlow, Date nextExecuteTime) {
SpiderJobContext context = null;
Date now = new Date();
Task task = new Task();
task.setFlowId(spiderFlow.getId());
task.setBeginTime(new Date());
taskService.save(task);
run(spiderFlow,task,nextExecuteTime);
}
public void run(SpiderFlow spiderFlow, Task task,Date nextExecuteTime) {
SpiderJobContext context = null;
Date now = new Date();
try {
taskService.save(task);
context = SpiderJobContext.create(this.workspace, spiderFlow.getId(),task.getId(),false);
SpiderContextHolder.set(context);
contextMap.put(task.getId(), context);
......
......@@ -11,6 +11,7 @@ import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spiderflow.core.Spider;
import org.spiderflow.core.model.SpiderFlow;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -70,9 +71,9 @@ public class SpiderJobManager {
}
public void run(String id){
new Thread(()->{
Spider.executorInstance.submit(()->{
spiderJob.run(id);
}).start();
});
}
public boolean remove(String id){
......
......@@ -2,10 +2,14 @@ package org.spiderflow.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spiderflow.context.SpiderContext;
import org.spiderflow.core.Spider;
import org.spiderflow.core.job.SpiderJob;
import org.spiderflow.core.job.SpiderJobContext;
import org.spiderflow.core.model.SpiderFlow;
import org.spiderflow.core.model.Task;
import org.spiderflow.core.service.SpiderFlowService;
import org.spiderflow.core.service.TaskService;
import org.spiderflow.model.JsonBean;
import org.spiderflow.model.SpiderOutput;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -15,6 +19,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.List;
import java.util.Map;
......@@ -32,14 +37,75 @@ public class SpiderRestController {
@Value("${spider.workspace}")
private String workspace;
@Autowired
private SpiderJob spiderJob;
@Autowired
private TaskService taskService;
/**
* 异步运行
* @param id
* @return
*/
@RequestMapping("/runAsync/{id}")
public JsonBean<Integer> runAsync(@PathVariable("id")String id){
SpiderFlow flow = spiderFlowService.getById(id);
if(flow == null){
return new JsonBean<>(0, "找不到此爬虫信息");
}
Task task = new Task();
task.setFlowId(flow.getId());
task.setBeginTime(new Date());
taskService.save(task);
Spider.executorInstance.submit(()->{
spiderJob.run(flow,task,null);
});
return new JsonBean<>(task.getId());
}
/**
* 停止运行任务
* @param taskId
*/
@RequestMapping("/stop/{taskId}")
public JsonBean<Void> stop(@PathVariable("taskId")Integer taskId){
SpiderContext context = SpiderJob.getSpiderContext(taskId);
if(context == null){
return new JsonBean<>(0,"任务不存在!");
}
context.setRunning(false);
return new JsonBean<>(1,"停止成功!");
}
/**
* 查询任务状态
* @param taskId
*/
@RequestMapping("/status/{taskId}")
public JsonBean<Integer> status(@PathVariable("taskId")Integer taskId){
SpiderContext context = SpiderJob.getSpiderContext(taskId);
if(context == null){
return new JsonBean<>(0); //
}
return new JsonBean<>(1); //正在运行中
}
/**
* 同步运行
* @param id
* @param params
* @return
*/
@RequestMapping("/run/{id}")
public JsonBean<List<SpiderOutput>> run(@PathVariable("id")String id,@RequestBody(required = false)Map<String,Object> params){
SpiderFlow flow = spiderFlowService.getById(id);
if(flow == null){
return new JsonBean<>(0, "找不到此爬虫信息");
}
List<SpiderOutput> outputs = null;
List<SpiderOutput> outputs;
Integer maxId = spiderFlowService.getFlowMaxTaskId(id);
SpiderJobContext context = SpiderJobContext.create(workspace, id,maxId,true);
try{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册