灯火互联
管理员
管理员
  • 注册日期2011-07-27
  • 发帖数41778
  • QQ
  • 火币41290枚
  • 粉丝1086
  • 关注100
  • 终身成就奖
  • 最爱沙发
  • 忠实会员
  • 灌水天才奖
  • 贴图大师奖
  • 原创先锋奖
  • 特殊贡献奖
  • 宣传大使奖
  • 优秀斑竹奖
  • 社区明星
阅读:2940回复:0

DelayQueue在容错时的使用

楼主#
更多 发布于:2012-09-08 09:37


1:异步容错的处理需求
     遇到错误消息后,把消息写入到表中同时写入到queue中,把这个错误的内容异步通知到其他系统中去。同步的时间间隔以2的N次方递增,设计的前提是尽量减小数据库的压力。
     2:设计
      java.util.concurrent.DelayQueue中的对象必须实现java.util.concurrent.Delayed的接口,Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
    队列中的对象设计:
public class DelayDomain implements Delayed{
      /**
       * 处理机制:按照发送次数2的N次方秒递增。
       * @param payinternalNotify
       */
      public DelayDomain(int sendtime , String url){
            this.sendTime = (new Date()).getTime() +  (long)Math.pow(2, sendtime) * TIME_UNIT;
            this.url = url;
      }    
      @Override
      public int compareTo(Delayed obj) {
            DelayDomain delayDomain = (DelayDomain)obj;
            long timeout = sendTime – delayDomain.sendTime;
        return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;
      }
      @Override
      public long getDelay(TimeUnit unit) {
            return sendTime – System.currentTimeMillis();
      }
    
      private static final int TIME_UNIT = 1000 ; //按照秒来递增
      private long sendTime ;
      private String url ;
      public String getUrl() {
            return url;
      }    
}
     Queue的代码:
     public class SendQueue {
      public static void put(DelayDomain DelayDomain)
            throws InterruptedException{
            QUEUE.put(DelayDomain);
      }
      public static DelayDomain take()throws InterruptedException{
            return QUEUE.take();
      }    
      /**
       * 添加错误消息到队列中,
       * @param payInternalNotify
       */
      public static void addSendUrl(int sendTime , String url)
            throws InterruptedException{
            DelayDomain DelayDomain = new DelayDomain(sendTime , url);
            put(DelayDomain);
      }
      private SendQueue(){};
      //服务队列
      private static final BlockingQueue<DelayDomain> QUEUE =  new DelayQueue<DelayDomain>();
   }
     测试代码:
     public class DelayMain {
      public static void main(String[] args) throws Exception{
            System.out.println("Start time @ " + getNow());
            SendQueue.addSendUrl(2 , "www.atcpu.com");
            SendQueue.addSendUrl(1 , "www.google.com");
            SendQueue.addSendUrl(3 , "www.hao123.com");
            while(true){
                  DelayDomain domain = SendQueue.take();
                  System.out.println(domain.getUrl() + " @ " + getNow());
            }
      }    
      private static String getNow(){
            SimpleDateFormat sdf = new SimpleDateFormat("HH:mms");
            return sdf.format(new Date());
      }
   }
     输出结果:
     Start time : 11:20:21
     www.google.com @ 11:20:23
     www.atcpu.com @ 11:20:25
     www.hao123.com @ 11:20:29
    我们看到google在2秒后出队列,百度的4秒,hao123的8秒。放到队列中会自动按照时间顺序来排序,只有时间到了才会被take出队列,否则一直等待。

     3:设计缺点
    修改数据库状态不能自动同步了。需要通过脚本来执行一些过期的内容,或者通过接口方式处罚容错。


喜欢0 评分0
游客

返回顶部