使用Async进行流程控制

2015-10-23
6 min read

Async[1]提供了一些使用的工具,比如对象操作以及流程控制。我使用它的原因主要是解决js中回调函数嵌套过多的问题,在一边使用一边学习中习得了更多的功能的使用。在初识Promise就已经提及Async回调函数的控制内部机制。本文主要总结在项目中用到Async控制流程的部分,其他API的使用可以查看官网。

####1. waterfall 瀑布流

看到waterfall立马想到的是软工中的瀑布模型,其实意思也是字面意思。通过将任务分配到每个回调函数,可以充分利用到I/O的性能。但是当每一步的开始需要上一步返回的结果时,就进入到一个需要step by step按序执行任务的需求的,比如从数据库获取不同集合的文档数据,再处理成一个整体的数据存入数据库的另一个集合中。

这么说可能有点模糊,这里引入一个场景:在项目的邮件系统中,需要获取某个类别(category)的邮件列表,而前端向node端提供的查询条件为分页、状态过滤以及类别的id。在生成完查询语句后,我们需要将任务拆分为几步:第一步,通过Mongoose的.count()方法获取该类别下用户的未阅数量;第二步,按照前面的查询过滤条件向MongoDB请求获取一页的邮件列表。

Mongoose的方法遵循异步编程,也就是Async发挥用处的地方了。其waterfall的使用方法如下:waterfall(tasks, [callback]),tasks为函数数组,而callback参数可选,在每个子任务完成后执行,并传递最后一个task的回调参数。在tasks中每个task通过callback(null, param)来传递到下一个task,而第一个参数为捕获的错我,null表示没有错误,当然也来用传递标识量提前退出waterfall。以function(param1,…,callback){}分割每个task,一般出现回调函数时,就在回调函数中跳到下一个task。具体如下代码:

async.waterfall([
  //task1:获取cat下未阅邮件的数量
  function(callback){
    var uncheckedCondition = {};
    uncheckedCondition.$or = [
      { 'privateInfo._creator': {
        $ne: new ObjectId(getCondition.userId)
        }
      },
      {
        'privateInfo._creator': new ObjectId(getCondition.userId),
        'privateInfo.userStatus': { $ne: 'scored' }
      }
    ];    
    uncheckedCondition.commonStatus = getCondition.commonStatus;
    uncheckedCondition['category'] = catId;
    Mails.count(uncheckedCondition, function(err, count){
      if(err){
        console.log('getsByCatId ERR: count unchecked mails err ' + err);
        res.status(500).json({ 'message': 'ERR: count unchecked mails err'});
      }else{
        callback(null, catId, count);
      }
    });
  },
  //task2:获取cat下查询条件下的邮件列表
  function(catId, unchecked, callback){
    //然后再查询邮件列表
    queryCondition['category'] = catId;        
    query = Mails.find(queryCondition).select(select).sort(sort).skip(numSkip).limit(numPerPage).lean();
    //获得本次查询的邮件总数量
    Mails.count(queryCondition, function(err, count){
      if(err){
        console.log('ERR: count mails failed to ' + err);
        res.status(500).json({ 'message': 'ERR: count mails err'});
      }else{
        //执行查询操作
        query.exec(function(err, mails){
          if(err){
            console.log('ERR: find mails failed: ' + err);
            res.status(500).send(err).end();
          }else{
            mails.count = count;
            _genAndSendMailList(req, res, mails, unchecked);
          }
        });       
      }
    });
  }
]);

对于上述代码中不熟悉的Mongoose操作可以看Mongoose系列中了解详细内容。细想一下,waterfall就是将异步运行强扭成了同步运行,所以使用waterfall还是需要仔细划分业务逻辑,尽量减少watrefall的水量(water)。

####2. parallel 并行

parallel(tasks, [callback]) 的参数和waterfall一样,不过tasks是并行执行的。在原始的流程中,通过条件判断来确定需要更新数据库中的Configs配置信息。后来发现只要当前端传递过来的参数中有更新数据的关键字段就表示需要修改某部分配置信息。那我们只需要并行的检测是否有一些字段就可以同时更新数据库的数据了。代码如下:这里精简了一些对async内容无意义的代码。

//通过条件判断修改的数据是哪块字段
async.parallel([
  function(cb){
    //imap的修改
    if(updateData.hasOwnProperty('imapHost')){
      //imap接收邮件协议配置
      if(!(检测其他相关字段)){
        res.status(400).json({ 'message': 'partial response body data.'});
      }else{
        Configs.update({}, {}, function(err, numAffected){
          if(err){
            console.log('changeConfigs ERR: update imap config  err:' + err);
            res.status(500).json({ 'message': err});
          }else{
            console.log('changeConfigs INFO: update imap config successfully.');
            cb(null, 'imap');
          }
        });
      }
    }else{
      cb(null, '');
    }      
  },
  function(cb){
    //smtp的修改
    if(updateData.hasOwnProperty('smtpHost')){
      //stmp发送邮件设置
      if(!(检测其他相关字段)){
        res.status(400).json({ 'message': 'partial response body data.'});
      }else{
        Configs.update({ '_id': new ObjectId('55a22d3dc1bf661c27d37bf0')}, {
          $set: {
            'smtp.user': updateData.addressEasy.trim(),
            'smtp.password': updateData.passwordEasy.trim(),
            'smtp.host': updateData.smtpHost.trim(),
            'smtp.port': updateData.smtpPort             
          }
        }, function(err, numAffected){
          if(err){
            console.log('changeConfigs ERR: update stmp config  err:' + err);
            res.status(500).json({ 'message': err});
          }else{
      }
    }else{
      cb(null, '');
    }         
  },
            console.log('changeConfigs INFO: update stmp config successfully.');
            cb(null, 'smtp');
          }
        });
  function(cb){
    //检查是否为修改过滤设置
    if(updateData.hasOwnProperty('filterWords')){
      Configs.update({ '_id': new ObjectId('55a22d3dc1bf661c27d37bf0')}, {
        'filterWords': updateData.filterWords.trim(),
        'filterSize': updateData.filterSize
      }, function(err, numAffected){
        if(err){
          console.log('changeConfigs ERR: update filter config  err:' + err);
          res.status(500).json({ 'message': err});
        }else{
          console.log('changeConfigs INFO: update filter config successfully.');         
          cb(null, 'filter');
        }
      });
    }else{
      cb(null, '');
    }
  }
], function(err, results){
  var updateKey = '';
  
  //历遍task的运行结果:输出修改了哪些字段。
  _.map(results, function(result){
    updateKey += result;
  });
  res.status(201).json({ 'message': 'update ' + updateKey + ' config OK.'});
});

####3. each 数组迭代与 forEachOf 对象迭代

正如中文表述的一样,两者在方法使用上时一致的,只是一个是对数组迭代,一个是对对象迭代。使用方法为: each(arr, iterator, [callback]) 对应对象的操作只需要修改方法名为 forEachOf(obj, iterator, [callback])。迭代器iterator针对每个元素进行操作,在操作之后。需要强调的是,迭代时并发处理每段函数,如果需要按序处理每个元素的话需要调用 eachSeries(arr, iterator, [callback])。

实际使用的场景是这样的,当修改category的status字段为1,将category的_id更新到每个用户的colMenu和ruleMenu数组。所以我们将迭代用户数组,先将用户数组的用户数组需要更新的字段获取,在async.each()中对每个用户的对应数组增加元素。具体如下代码:

async.each(users, function(user, cbUserEach){
  if(frontData.type === 'elite'){
    //更新数据库
    Users.update({ '_id': user._id },
    {
      $push: {
        'colMenu': cat._id.toString()
      }
    }, function(err, numAff){
      if(err){
        console.log('createCat ERR: update user colMenu: ' + (err || numAff));
        cbUserEach(err);
      }else{
        cbUserEach();
      }
    });
  }else{
    //更新数据库
    Users.update({ '_id': user._id },
    {
      $push: {
        'ruleMenu': cat._id.toString()
      }
    }, function(err, numAff){
      if(err){
        console.log('createCat ERR: update user ruleMenu: ' + (err || numAff));
        cbUserEach(err);
      }else{
        cbUserEach();
      }          
    });                
  }
}, function(err){
  if(err){
    console.log('createCat ERR: update users menu: ' + err);
  }else{
    console.log('createCat INFO: update users menu successfully.');
  }
});

####4. whilst 有条件循环

async.whilst(test, fn, callback) 重复地执行fn,直到test返回的false进入回调函数callback。所以这里需要有一些变量来判定条件的是否。

项目中使用了solr对数据库中的数据建立索引。在邮件过滤的业务中,需要获取solr的返回,在下列代码,gotLoop变量在每个迭代中增加一次,表示从solr端获取的一次数据。而pageLoop是以500份分割索引数获得的总页数。这样每次迭代就只获取500份数据的_id。

async.whilst(
  //as while condition
  function(){ return gotLoop < pageLoop; },
  //loop work
  function(whilstCb){
    var loopQuery = client.createQuery()
      .q(qStr)
      .sort('mail.date desc')
      .start(gotLoop * 500)
      .rows(500)
      .fl('_id');
    client.search(loopQuery, function(err, loopObj){
      if(err){
        process.send({ 'error': 'solr search loop err: ' + err + ' at page ' + gotLoop});
      }else{
        if(loopObj.hasOwnProperty('response') && loopObj.response.hasOwnProperty('docs')){
          loopObj.response.docs.forEach(function(loopDoc){
            mailIdsArr.push(loopDoc._id);
          });
          gotLoop++;
          console.log('FILTER Child process run to pass the mailIdArr to update the mails collection.');
          //使用waterfall传递数组
          cb(null, mailIdsArr);
        }
      }
    });
  },
  function(err){
    //回调函数
    if(err){
      process.send({ 'error': 'async whilist err: ' + err + ' at page ' + gotLoop});
      process.exit();
    }else{
      process.send({ 'error': 'async whilist stop with: ' + gotLoop});
      process.exit();
    }
  }
);

注意代码中的25行,调用的回调函数 cb(null, mailIdsArr); 为waterfall的回调函数,下一个task为更新mailId指定的邮件相关字段。将两个结合方式可能会占用不少资源,但提供老清晰的逻辑,便于代码修改和审查。


####总结

通过Async这几个比较使用的方法,解放了js中回调嵌套过多的问题。使用Async的重点是何时使用这个工具,比如提高I/O操作的效率以及减少重复的代码。

#####参考:

  1. Async的github页面
comments powered by Disqus