CSVを1行ずつ処理する

PCが突然死んでしまった。

きっかけは最近メイン環境の調子が悪くPCの起動に時間がかかっていた。 それだけならよかったのだが、操作中になんの前触れもなく文字通り画面が消えてOSごと起動できなくなってしまった。 今思えば起動できなくなったOSを再インストールするだけでよかったかもしれないが、同じPCに組んでいるRAID1のバックアップ用のディスクをfsckしたらなぜか両方のディスクの中身ごと全部消されてしまった。

幸いこうして同じPCで復旧作業は完了することができたので、死んだというには少々突飛な表現だったかもしれない。

他にもバックアップ先は分散していたものの、直近まで作業していたプロジェクトのディレクトリやすべてバックアップしきれていなかったファイルはすべて失ってしまった。

閑話休題。今回はChatGPTに質問して興味深いコードを生成できたので残しておこうと思う。 タイトルの通り、手元におよそ2000行くらいあるCSVのファイルがある。 このCSVの行データをある関数の引数として使おうと思う。

もしRubyで順当に書くのであればこうだ:

def process(row)
  # ...
end

CSV.foreach("your-file.csv", headers: true) do |row|
  process(row)
end

このCSV.foreachの中身を見るとわかりやすい。

def self.foreach(path, **options, &block)
  return to_enum(__method__, path, options) unless block_given?
  open(path, options) do |csv|
    csv.each(&block)
  end
end

CSVファイルを一度メモリに呼び出して1行ごとブロックが評価されるのでprocess関数を呼び出せている。

今回はNode.JSで同様の操作を行ってみようと思う。 ただし、Node.JSにはfs.createReadStreamが使えるため、ファイルは分割されたチャンクごとに呼び出されるはずである。

const fs = require('fs');
const csv = require('csv-parser');

const results = [];

// Replace 'your-file.csv' with the path to your CSV file
fs.createReadStream('your-file.csv')
  .pipe(csv())
  .on('data', (data) => {
    // Process each row of data here
    results.push(data);
  })
  .on('end', () => {
    // All rows have been processed; you can work with the parsed data here
    console.log(results);
  });

この場合、CSVファイルを呼び出して行が読み出せた単位でdataというイベントハンドラが呼ばれて予め用意されたresultsという空の配列にプッシュしていく。 そして全て読み終わってから配列の中身をプリントアウトする。 ここまでは何の変哲もないコードである。

これではせっかくNode.JSを使う意味合いも薄いのでジェネレータを使ったらどうなるかと訊ねてみた。

const fs = require('fs');
const csv = require('csv-parser');

// Define a generator function to process data asynchronously
async function* processData() {
  // Replace 'your-file.csv' with the path to your CSV file
  const csvStream = fs.createReadStream('your-file.csv').pipe(csv());

  for await (const data of csvStream) {
    // Process each row of data here
    yield data;
  }
}

// Use the generator function to process CSV data
async function main() {
  for await (const row of processData()) {
    // You can process each row of data here
    console.log(row);
  }
}

main()
  .then(() => {
    console.log('CSV processing complete');
  })
  .catch((error) => {
    console.error('Error:', error);
  });

非常に明快なコードが出力された。

たしかちょうどPromiseがサポートされる前後に出てきたジェネレータという概念はなかなか理解できなかったのだが、これならわかりやすい。 普段プログラミングを行うときは関数を単純にしたいので1つのループに対して1回分の処理を書くイメージだと思う。 processDataが興味深いのはCSV全体の読み込みを1つの関数で行っていて、1行分の読み込みを行ったらそのあと停止するのだろう。 Rubyにもyieldはあるのだが、行き止まりを意味するyieldが効いてくる。

  1. main()が実行され、processData()でCSVの読み出しを開始する
  2. yield dataで関数の実行を止める
  3. main()内でconsole.log(row)で1行分のコードを実行する
  4. processData()で処理を再開する
  5. processData()が処理を完了するまで2..4を繰り返す

具体的にはこういう感じで読み進めればよい。 もしfsPromise.readFileで同じコードを書いたとしたら最初のRubyで書いた例のように1行ごとという意味にはならない。 本来fs.createReadStreamは直接await使えないだろう。

厳密には改行コードごとに1行ずつ読んでいるわけではないかもしれないが、極めてメモリに配慮した作りだと思う。 freeなどのコマンドを使って検証しているわけではないので完全に言い切れるかは怪しいけれども。 あるいはRubyやfsPromise.readFileを使った場合でもメモリに優しい書き方があるかもしれない。

これでも十分目的は果たせたとは思うけれども、検証しやすいようにsleepハックというものを行う。 RubyにはKernelクラスにsleepというメソッドが存在するが、Node.JSではsetTimeoutpromisifyして非同期のsleep関数を定義する必要がある。 ChatGPTに続けてsleepを組み込んだバージョンも質問してみた。

const fs = require('fs');
const csv = require('csv-parser');
const { promisify } = require('util');

// Promisify setTimeout to create a sleep function
const sleep = promisify(setTimeout);

// Define a generator function to process data asynchronously
async function* processData() {
  // Replace 'your-file.csv' with the path to your CSV file
  const csvStream = fs.createReadStream('your-file.csv').pipe(csv());

  for await (const data of csvStream) {
    // Sleep for 1 second before processing each row
    await sleep(1000); // 1000 milliseconds = 1 second

    // Process each row of data here
    yield data;
  }
}

// Use the generator function to process CSV data
async function main() {
  for await (const row of processData()) {
    // You can process each row of data here
    console.log(row);
  }
}

main()
  .then(() => {
    console.log('CSV processing complete');
  })
  .catch((error) => {
    console.error('Error:', error);
  });

そっちに入れるかとは思ったものの、想定どおりのコードが生成された。 意味合いとしてはmain関数内で1秒待つではなく、CSVの読み込みができたら1秒待つという感じだろう。 これは実行してみるとたしかに1行ごとに1秒処理しているのがわかる。

もし一番最初の例で出たon('data')の引数にコールバックを書く形式だったら1行ごとに1秒まつのではなく、行を読むごとにsleep関数を実行するだけで肝心の次の処理まで待つという意味合いではなくなってしまう。 このあたりの理解が今回は一番重要だと思う。

ChatGPTがなくても従来の検索エンジンに頼る方法でもいずれはこの方法を導き出せたかもしれないが、失敗する覚悟で聞いてみると案外動くコードが出せたりする。 これは同時にズルをしている感覚にも陥りがちなのは理解できるけれども、大学の課題ではないので今後も大いにChatGPTやそれに類いする生成AIのお世話になりたいと思った次第である。 ちなみに今回のブログの文章は生成させたものではないため、必ずしも内容が正しくない可能性もあるのであしからず。