JAVA多线程怎么实现用户任务排队并预估排队时长

技术JAVA多线程怎么实现用户任务排队并预估排队时长这篇文章主要介绍“JAVA多线程怎么实现用户任务排队并预估排队时长”,在日常操作中,相信很多人在JAVA多线程怎么实现用户任务排队并预估排队时长问题上存在疑惑,小编查阅

本文主要介绍“JAVA多线程如何对用户任务进行排队并估计队列长度”。在日常操作中,相信很多人对于JAVA多线程如何对用户任务进行排队,如何估计队列长度都有疑问。边肖查阅了各种资料,整理出简单易用的操作方法,希望能帮助大家解答“JAVA多线程如何对用户任务进行排队,估计队列长度”的疑惑!接下来,请和边肖一起学习!

JAVA多线程怎么实现用户任务排队并预估排队时长

实现流程

JAVA多线程怎么实现用户任务排队并预估排队时长

初始化一定数量的任务处理线程和缓存线程池,每次用户调用接口时,都会启动一个线程进行处理。

假设初始化了五个处理器,当代码执行BlockingQueue.take时,处理器队列将每次减少一个。当处理器队列为空时,take是阻塞线程。当用户处理完某个任务后,调用资源释放接口,在处理器队列中放入一个处理器对象,原来阻塞的take将继续执行。

排队论简介

排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,是运筹学的一个分支。让我们简化一下排队论。我们来看看下图:

JAVA多线程怎么实现用户任务排队并预估排队时长

00-1010任务队列初始化任务队列

import com . baomidou . mybatiplus . core . toolkit . collection utils;

import org . spring framework . stereotype.component;

import javax . annotation . post construct;

import Java . util . optional;

import Java . util . concurrent . blockingqueue;

import Java . util . concurrent . executorservice;

import Java . util . concurrent . executors;

import Java . util . concurrent . linkedblockingqueue;

import Java . util . concurrent . atomic . atomicinteger;

/**

*初始化队列和线程池

*@authortarzan

*

*/

@组件

publicclassTaskQueue{

//处理器队列

publicationstaticblockingqueuetaskprocessortasksprocessors;

//等待任务队列

publicationstaticblockingqueuecompiletaskswaittasks;

//处理任务队列

publicationstaticblockingqueuecompiletaskeexecute tasks;

//线程池

publicationstatexecutorserviceexec;

//处理器的初始数量(计算机cpu可用的线程数量)

publicationstatintegerprocessornum=runtime . getruntime()。available processors();

/**

*初始化处理器,等待任务,处理任务队列和线程池。

*/

@PostConstruct

n

bsp;  public static void initEquipmentAndUsersQueue(){
        exec = Executors.newCachedThreadPool();
        taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);
        //将空闲的设备放入设备队列中
        setFreeDevices(processorNum);
        waitTasks =new LinkedBlockingQueue<CompileTask>();
        executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);
    }
 
 
    /**
     * 将空闲的处理器放入处理器队列中
     */
    private static void setFreeDevices(int num) {
        //获取可用的设备
        for (int i = 0; i < num; i++) {
            TaskProcessor dc=new TaskProcessor();
            try {
                taskProcessors.put(dc);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
 
 
    public static CompileTask getWaitTask(Long clazzId) {
        return get(TaskQueue.waitTasks,clazzId);
    }
 
    public static CompileTask getExecuteTask(Long clazzId) {
        return get(TaskQueue.executeTasks,clazzId);
    }
 
 
    private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {
        CompileTask compileTask =null;
        if (CollectionUtils.isNotEmpty(users)){
            Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();
            if(optional.isPresent()){
                compileTask =  optional.get();
            }
        }
        return compileTask;
    }
 
    public static Integer getSort(Long clazzId) {
        AtomicInteger index = new AtomicInteger(-1);
        BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;
        if (CollectionUtils.isNotEmpty(compileTasks)){
            compileTasks.stream()
                    .filter(e -> {
                        index.getAndIncrement();
                        return e.getClazzId().longValue() == clazzId.longValue();
                    })
                    .findFirst();
        }
        return index.get();
    }
 
    //单位秒
    public static int estimatedTime(Long clazzId){
        return  estimatedTime(60,getSort(clazzId)+1);
    }
 
    //单位秒
    public static int estimatedTime(int cellMs,int num){
         int a= (num-1)/processorNum;
         int b= cellMs*(a+1);
        return  b;
    }

编译任务类 CompileTask

import lombok.Data;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.gis.common.enums.DataScheduleEnum;
import org.springblade.gis.dynamicds.service.DynamicDataSourceService;
import org.springblade.gis.modules.feature.schedule.service.DataScheduleService;
 
import java.util.Date;
 
 
@Data
public class CompileTask implements Runnable {
    //当前请求的线程对象
    private Long clazzId;
    //用户id
    private Long userId;
    //当前请求的线程对象
    private Thread thread;
    //绑定处理器
    private TaskProcessor taskProcessor;
    //任务状态
    private Integer status;
    //开始时间
    private Date startTime;
    //结束时间
    private Date endTime;
 
    private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);
 
    private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);
 
    @Override
    public void run() {
        compile();
    }
 
    /**
     * 编译
     */
    public void compile() {
        try {
            //取出一个设备
            TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();
            //取出一个任务
            CompileTask compileTask = TaskQueue.waitTasks.take();
            //任务和设备绑定
            compileTask.setTaskProcessor(taskProcessor);
            //放入
            TaskQueue.executeTasks.put(compileTask);
            System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);
            //切换用户数据源
            dataSourceService.switchDataSource(userId);
            //添加进度
            dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());
        } catch (InterruptedException e) {
            System.err.println( e.getMessage());
        }
    }
 
}

任务处理器 TaskProcessor 

import lombok.Data;
 
import java.util.Date;
 
@Data
public class TaskProcessor {
 
    /**
     * 释放
     */
    public  static Boolean release(CompileTask task)  {
        Boolean flag=false;
        Thread thread=task.getThread();
        synchronized (thread) {
            try {
                if(null!=task.getTaskProcessor()){
                    TaskQueue.taskProcessors.put(task.getTaskProcessor());
                    TaskQueue.executeTasks.remove(task);
                    task.setEndTime(new Date());
                    long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();
                    flag=true;
                    System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return flag;
        }
    }
 
}

Controller控制器接口实现

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springblade.core.tool.api.R;
import org.springblade.gis.multithread.TaskProcessor;
import org.springblade.gis.multithread.TaskQueue;
import org.springblade.gis.multithread.CompileTask;
import org.springframework.web.bind.annotation.*;
 
import java.util.Date;
 
 
@RestController
@RequestMapping("task")
@Api(value = "数据编译任务", tags = "数据编译任务")
public class CompileTaskController {
 
    @ApiOperation(value = "添加等待请求 @author Tarzan Liu")
    @PostMapping("compile/{clazzId}")
    public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {
        CompileTask checkUser=TaskQueue.getWaitTask(clazzId);
        if(checkUser!=null){
            return  R.fail("已经正在排队!");
        }
        checkUser=TaskQueue.getExecuteTask(clazzId);
        if(checkUser!=null){
            return  R.fail("正在执行编译!");
        }
        //获取当前的线程
        Thread thread=Thread.currentThread();
        //创建当前的用户请求对象
        CompileTask compileTask =new CompileTask();
        compileTask.setThread(thread);
        compileTask.setClazzId(clazzId);
        compileTask.setStartTime(new Date());
        //将当前用户请求对象放入队列中
        try {
            TaskQueue.waitTasks.put(compileTask);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        TaskQueue.exec.execute(compileTask);
        return R.data(TaskQueue.waitTasks.size()-1);
    }
 
    @ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu")
    @PostMapping("sort/{clazzId}")
    public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.getSort(clazzId));
    }
 
    @ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu")
    @PostMapping("estimate/time/{clazzId}")
    public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {
        return R.data(TaskQueue.estimatedTime(clazzId));
    }
 
    @ApiOperation(value = "任务释放 @author Tarzan Liu")
    @PostMapping("release/{clazzId}")
    public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {
        CompileTask task=TaskQueue.getExecuteTask(clazzId);
        if(task==null){
            return  R.fail("资源释放异常");
        }
        return R.status(TaskProcessor.release(task));
    }
 
    @ApiOperation(value = "执行 @author Tarzan Liu")
    @PostMapping("exec")
    public R exec() {
        Long start=System.currentTimeMillis();
        for (Long i = 1L; i < 100; i++) {
            compile(i);
        }
        System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms");
        return R.status(true);
    }
}

接口测试

根据任务id查询该任务前还有多少个任务待执行

JAVA多线程怎么实现用户任务排队并预估排队时长

根据任务id查询该任务预估执行完成的剩余时间,单位秒

JAVA多线程怎么实现用户任务排队并预估排队时长

补充知识

BlockingQueue

BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

JAVA多线程怎么实现用户任务排队并预估排队时长

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

阻塞与非阻塞

入队

offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞

put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞

offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞

被唤醒

等待时间超时

当前线程被中断

出队

poll():如果没有元素,直接返回null;如果有元素,出队

take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞

poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

被唤醒

等待时间超时

当前线程被中断 

到此,关于“JAVA多线程怎么实现用户任务排队并预估排队时长”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/140959.html

(1)

相关推荐

  • 免费刷粉网站推广,抖音怎么增加粉丝量?

    技术免费刷粉网站推广,抖音怎么增加粉丝量?免费刷粉网站推广,抖音怎么增加粉丝量?抖音视频一直都是很受大家欢迎的一个社交软件,不仅仅是刷抖音看作品,也有很多人在抖音上拍摄作品分享。分享抖音作品就得有点赞双击量,这样才能有人

    测评 2021年11月11日
  • 日本服务器租用的优势

    技术日本服务器租用的优势日本服务器是继香港服务器之后亚洲企业建站的又一个理想选择,其主要优势就是访问速度快、稳定性高和价格便宜等。有大陆优化和精品网两个线路,那么日本服务器怎么样?有哪些优势呢接下来本文将详细进行介绍1.

    礼包 2021年11月1日
  • 如何使用纯java config来配置spring mvc方式

    技术如何使用纯java config来配置spring mvc方式这篇文章将为大家详细讲解有关如何使用纯java config来配置spring mvc方式,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这

    攻略 2021年11月29日
  • 11月17日Java学习日记

    技术11月17日Java学习日记 11月17日Java学习日记面向对象编程(oop)面向对象:物以类聚,分类的思维模式,思考问题,首先解决问题需要哪些分类,然后对这些分类进行单独思考。最后,才对某个分类

    礼包 2021年11月18日
  • 《Unix/Linux系统编程》第六章学习笔记

    技术《Unix/Linux系统编程》第六章学习笔记 《Unix/Linux系统编程》第六章学习笔记信号和信号处理
    摘要
    本章讲述了信号和信号处理;介绍了信号和中断的统一处理,有助于从正确的角度看待信号;

    礼包 2021年11月13日
  • Spring AOP如何实现简单的日志切面

    技术Spring AOP如何实现简单的日志切面本篇文章给大家分享的是有关Spring AOP如何实现简单的日志切面,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看

    攻略 2021年10月27日