Java多线程编程实战之模拟大量数据同步

  


  

  

最近对于Java多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的。否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果。

  

不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中。这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景:

  

已知某公司管理着1000个微信服务号,每个服务号有1 w ~ 50 w粉丝不等。假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信API的方式更新到本地数据库。

  


  

  

对此需求进行分析,主要存在以下问题:

  
      <李>单个服务号获取粉丝id,只能每次1 w按顺序拉取李   <李>微信的API对于服务商的并发请求数量有限制李   
  

单个服务号获取粉丝id,只能每次1 w按顺序拉取。这个问题决定了单个公众号在拉取粉丝id上,无法分配给多个线程执行。

  

微信的API对于服务商的并发请求数量有限制。这点最容易被忽略,如果我们同时有过多的请求,则会导致接口被封禁。这里可以通过信号量来控制同时执行的线程数量。

  

为了尽快完成数据同步,根据实际情况:整个数据同步可分为读数据和写数据两个部分。读数据是通过API获取,走网络IO,速度较慢,写数据是写到数据库,速度较快。所以得出结论:需要分配较多的线程进行读数据,较少的线程进行写数据。

  


  

  

首先,我们需要确定开启多少个线程(在生产中往往是使用线程池),线程数量需要根据服务器性能来决定,这里我们定为40个读取数据线程(将1000个公众号分为40份,分别在40个线程中执行),1个写入数据线程。(具体开多少个线程,取决于线程池的容量,以及可以分配给此业务的数量。具体的数字需要根据实际情况测试得出,比服务器阈值低一些较好。当然,配置允许范围内越大越好)

  

其次,考虑到微信对API并于发请求的限制,需要限制同时执行的线程数,使用<代码> java . util . concurrent。信号量>   

然后,我们需要知道数据何时读取,写入完毕,以控制程序逻辑以及终止程序,这里我们使用<代码> java.util.concurrent.CountDownLatch 进行控制。

  

最后,我们需要一个数据结构,用来在多个线程中共享处理的数据,此处同步数据的场景非常适合使用队列,这里我们使用线程安全的<代码> java.util.concurrent.ConcurrentLinkedQueue>   


  

  

由于本文重点关注多线程的使用,模拟代码只体现多线程操作的方法。代码里添加了大量的注释,方便各位读者阅读理解。

  JDK:

1.8         进口java.util.Arrays;   进口并不知道;   进口java.util.Queue;   进口java.util.concurrent.ConcurrentLinkedQueue;   进口java.util.concurrent.CountDownLatch;   进口java.util.concurrent.Semaphore;   进口java.util.concurrent.TimeUnit;/* *   * N个线程向队列添加数据   *一个线程消费队列数据   */公开课QueueTest {   私有静态Listdata=https://www.yisu.com/zixun/Arrays.asList (“a”、“b”、“c”,“d”,“e”);      私有静态最终int OFFER_COUNT=40;//开启的线程数量      私有静态信号信号量=new信号(20);//同一时间执行的线程数量(大多用于控制API调用次数或数据库查询连接数)      公共静态void main (String [] args)抛出InterruptedException {   Queue队列=new ConcurrentLinkedQueue<在();//处理队列,需要处理的数据,放置到此队列中      CountDownLatch offerLatch=new CountDownLatch (OFFER_COUNT);//提供线程门闩,每完成一个,门闩减一,lacth的计数为0时表示提供处理完毕   CountDownLatch pollLatch=new CountDownLatch (1);//调查线程门闩,门闩的计数为0时,表示投票处理完毕      可运行offerRunnable=()→{   尝试{   semaphore.acquire ();//信号量控制   }捕捉(InterruptedException e) {   e.printStackTrace ();   }      尝试{   (字符串数据:数据){   queue.offer(基准);   TimeUnit.SECONDS.sleep (2);//模拟取数据很慢的情况   }   }捕捉(InterruptedException e) {   e.printStackTrace ();   最后}{//在最后中执行latch.countDown()以及信号量释放,避免因异常导致没有正常释放   offerLatch.countDown ();   semaphore.release ();   }   };      可运行pollRunnable=()→{   int数=0;   尝试{   而(offerLatch.getCount()在0 | | queue.size()比;0){//只要提供的锁未执行完,或队列仍旧有数据,则继续循环   字符串调查=queue.poll ();   如果(调查!=null) {   System.out.println(调查);   数+ +;   }//无论是否调查到数据,均暂停一小段时间,可降低CPU消耗   TimeUnit.MILLISECONDS.sleep (100);   }   system . out。println(“总菌数:“+数);   }捕捉(InterruptedException e) {   e.printStackTrace ();   最后}{//在最后中执行latch.countDown(),避免因异常导致没有正常释放   pollLatch.countDown ();   }   };//启动线程(生产环境中建议使用线程池)   新线程(pollRunnable) .start ();//启动一个调查线程   for (int i=0;我& lt;OFFER_COUNT;我+ +){   新线程(offerRunnable) .start ();   }//模拟取数据很慢,需要开启40个线程处理//锁等,待会阻止主线程直到门闩的计数为0   offerLatch.await ();   pollLatch.await ();      system . out。println (“======");   }   }

Java多线程编程实战之模拟大量数据同步