Node.js(ES6)のPromiseを使用して非同期のループ処理を行う

同期処理なら普通のforループで問題ないけど中に非同期処理が入っていると少々面倒




今回やりたかったこと
csvを読み込んでデータベースにデータを投入するバッチ処理を行う


// データはcsvファイル
let date = require("fs").readFileSync("test.csv", "utf-8").split("\r\n")

// データを操作する何か
const db = require("db")

var inserted = 0
var failed = 0
for (let index = 0; index < data.length; index++) {
  const elements = data[index].split(",")
  const datum = {
    data0: elements[0]
    data1: elements[1]
    // ...
  }
  // 非同期処理
  db.insert(datum, function (err, body) {
   // データが挿入時の処理だが通らない
    if (err) {
      inserted++
    } else {
      failed++
    }
  })
}


実行後しばらくして以下のエラーが発生

< --- Last few GCs --->

32984 ms: Mark-sweep 1390.6 (1432.2) -> 1390.0 (1434.2) MB, 18.3 / 0.0 ms (+ 3.9 ms in 7 steps since start of marking, biggest step 2.0 ms) [allocation failure] [GC in old space
requested].
32998 ms: Mark-sweep 1390.0 (1434.2) -> 1389.6 (1434.2) MB, 14.3 / 0.0 ms [allocation failure] [GC in old space requested].
33017 ms: Mark-sweep 1389.6 (1434.2) -> 1389.5 (1403.2) MB, 18.2 / 0.0 ms [last resort gc].
33030 ms: Mark-sweep 1389.5 (1403.2) -> 1389.5 (1403.2) MB, 13.7 / 0.0 ms [last resort gc].


< --- JS stacktrace --->

==== JS stack trace =========================================

Security context: 0x1814890cfb51
1: SparseJoinWithSeparatorJS(aka SparseJoinWithSeparatorJS) [native array.js:~75] [pc=0x34634daa5045] (this=0x181489004381 ,w=0x155f04aa1971 ,F=0x22e
101067ca1 < JS Array[43200]>,x=43200,I=0x1814890b46e1 ,J=0x1915d1f19199 )
2: DoJoin(aka DoJoin) [native array.js:137] [pc=0x34634da0d8...

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
1: node::Abort() [/Users/xxxx/.nvm/versions/node/v6.9.5/bin/node]
2: node::FatalException(v8::Isolate*, v8::Local, v8::Local) [/Users/xxxx/.nvm/versions/node/v6.9.5/bin/node]
3: v8::internal::V8::FatalProcessOutOfMemory(char const*, bool) [/Users/xxxx/.nvm/versions/node/v6.9.5/bin/node]
4: v8::internal::Factory::NewRawOneByteString(int, v8::internal::PretenureFlag) [/Users/xxxx/.nvm/versions/node/v6.9.5/bin/node]
5: v8::internal::Runtime_SparseJoinWithSeparator(int, v8::internal::Object**, v8::internal::Isolate*) [/Users/xxxx/.nvm/versions/node/v6.9.5/bin/node]
6: 0x34634d7092a7
7: 0x34634daa5045
Abort trap: 6


どうもインサート処理が行われずにヒープにひたすら溜まっていったらしい。


実行順序を制御するために以下のように同期処理を実装する。
参考
http://stackoverflow.com/questions/40328932/javascript-es6-promise-for-loop

// 挿入データ数
var inserted = 0
// 挿入失敗数
var failed = 0
// 即時関数にすると var failed = 0(function loop(i)... という解釈をしてしまい少々危うげなので一時変数に代入
const insert = function loop(i) {
  const promise = new Promise((resolve, reject) => {

    const timeout = 100
    setTimeout(() => {
      const elements = data[i].split(",")
  const datum = {
   data0: elements[0]
   data1: elements[1]
   // ...
  }
      table.insert(datum, key, function (err, body, header) {
        if (err) {
          failed++
        } else {
          inserted++
        }
      })
      resolve()
    }, timeout)
  }).then(() => i < data.length ? loop(i + 1) : {})
}
insert(0)

さっくり説明すると1つ処理が終わるとインデックスを加算して次の処理に移るといった感じか。
再帰っぽい?

セミコロンレスでやっていると即時関数が構文エラーになる模様
なので一旦変数に格納して呼んでいる。

またstackoverflowの
then( () => i >= 10 || loop(i+1) );
が微妙に読みにくかったので書き換えている。
then(() => i < data.length - 1 ? loop(i + 1) : {})

インデックスがデータサイズより小さかったら関数実行、それ以外は何もしない(終了)というふうに読める...はず

timeoutは処理に合わせて設定する(今回のケースだとデータベースが10件/sでしか挿入できないので100ms毎に実行している)
async.eachを使うともっと楽らしい(使いたくないので使わなかった)

2017年4月21日金曜日