java中的消息队列怎么利用多线程实现

  

java中的消息队列怎么利用多线程实现?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

1,定义一个队列缓存池:

,//静态修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。   private  static  List ();

2,定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。

private  Integer  offerMaxQueue =, 2000;

3,定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中

new 线程(){   ,,,public  void 运行(){   ,,,,,(真正的){   ,,,,,String  ip =,空;   ,,,,,try  {   ,,,,,,synchronized  (queueCache), {   ,,,,,,,Integer  size =, queueCache.size ();   ,,,,,,,如果(大?=0){//队列缓存池没有消息,等待....,,,,,queueCache.wait ();   ,,,,,,,}   ,,,,,,,Queue  Queue =, queueCache.remove (0);      ,,,,,,,如果(isIpLock (queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理   ,,,,,,,,queueCache.add(队列);该队列重新加入队列缓冲池,滞后处理,   ,,,,,,,,继续;   ,,,,,,,其他}{   ,,,,,,//这里是处理该消息的操作。   ,,,,,,,}   ,,,,,,,size =, queueCache.size ();   ,,,,,,,如果(size=0) {,,,, queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。   ,,,,,,,}   ,,,,,,}   ,,,,,},catch  (Exception  e), {   ,,,,,,e.printStackTrace ();   ,,,,,最后}{   ,,,,,,try {//检出该消息队列的锁   ,,,,,,,unIpLock (queueStr);   ,,,,,,},catch  (Execption  e),{//捕获异常,不能让线程挂掉   ,,,,,,,e.printStackTrace ();   ,,,,,,},   ,,,,,,,,,,,,,,,,,,,,,}   ,,,,,}   ,,}.start ();

4,检入队列

synchronized  (queueCache), {   而(真){   Integer  size =, queueCache.size ();   如果(size>=offerMaxQueue) {   ,,,,,try  {   ,,,,,,queueCache.wait ();   继续;//继续执行等待中的检入任务。   ,}catch  (InterruptedException  e), {   ,,e.printStackTrace ();   ,}   }//如果      如果(size<=offerMaxQueue&, size> 0) {   ,queueCache.notifyAll ();   }   打破;//检入完毕   }//,   }

5,锁方法实现

/* *   *才能锁   *,才能@param  ip   *才能@return   *,才能@throws    ,*/,public  Boolean  isLock (String  queueStr), {   return 才能;this.redisManager.setnx (queueStr +“_lock",,“LOCK",, 10000) !=1;   ,}   ,//解锁   ,public  void  unIpLock (String  queueStr), {   如果才能(ip !=null) {   ,,this.redisManager.del (queueStr +“_lock");//,,lock.unlock ();   ,,}   ,}

看完上述内容,你们掌握java中的消息队列怎么利用多线程实现的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

java中的消息队列怎么利用多线程实现