java中的消息队列怎么利用多线程实现?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
1,定义一个队列缓存池:
,//静态修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。 private static  List();
2,定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。
private Integer offerMaxQueue =, 2000;
3,定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中
new 线程(){ ,,,public void 运行(){ ,,,,,(真正的){ ,,,,,String ip =,空; ,,,,,try { ,,,,,,synchronized (queueCache), { ,,,,,,,Integer size =, queueCache.size (); ,,,,,,,如果(大?=0){//队列缓存池没有消息,等待....,,,,,queueCache.wait (); ,,,,,,,} ,,,,,,,Queue Queue =, queueCache.remove (0); ,,,,,,,如果(isIpLock (queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理 ,,,,,,,,queueCache.add(队列);该队列重新加入队列缓冲池,滞后处理, ,,,,,,,,继续; ,,,,,,,其他}{ ,,,,,,//这里是处理该消息的操作。 ,,,,,,,} ,,,,,,,size =, queueCache.size (); ,,,,,,,如果(size=0) {,,,, queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。 ,,,,,,,} ,,,,,,} ,,,,,},catch (Exception e), { ,,,,,,e.printStackTrace (); ,,,,,最后}{ ,,,,,,try {//检出该消息队列的锁 ,,,,,,,unIpLock (queueStr); ,,,,,,},catch (Execption e),{//捕获异常,不能让线程挂掉 ,,,,,,,e.printStackTrace (); ,,,,,,}, ,,,,,,,,,,,,,,,,,,,,,} ,,,,,} ,,}.start ();
4,检入队列
synchronized (queueCache), { 而(真){ Integer size =, queueCache.size (); 如果(size>=offerMaxQueue) { ,,,,,try { ,,,,,,queueCache.wait (); 继续;//继续执行等待中的检入任务。 ,}catch (InterruptedException e), { ,,e.printStackTrace (); ,} }//如果 如果(size<=offerMaxQueue&, size> 0) { ,queueCache.notifyAll (); } 打破;//检入完毕 }//, }
5,锁方法实现
/* * *才能锁 *,才能@param ip *才能@return *,才能@throws ,*/,public Boolean  isLock (String queueStr), { return 才能;this.redisManager.setnx (queueStr +“_lock",,“LOCK",, 10000) !=1; ,} ,//解锁 ,public void  unIpLock (String queueStr), { 如果才能(ip !=null) { ,,this.redisManager.del (queueStr +“_lock");//,,lock.unlock (); ,,} ,}
看完上述内容,你们掌握java中的消息队列怎么利用多线程实现的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!