PostgreSQL に大きめのデータを高速にインポートしたい

.NET Core のアプリケーションで、ある程度まとまった量の外部データを PostgreSQL へ取り込むのに EntityFramework でいけるかなと考えていたのですが想定よりデータ量が多く、運用を考えるとあまり実用的な速度がでなかったので Npgsql の Binary COPY を使ってみました(まあこの用途で ORM 使うなよという話しではありますが…)。

COPY | Npgsql Documentation

100万件のデータを愚直に Add メソッドで追加するパターンと、Binary COPY で追加するパターンの処理時間を各3回ずつ計測しました。

※ 今回は下記のライブラリの実装を参考にさせていただきました。

Model

データアクセスに使う Entity と Context です。Bar テーブルに Id、 Title 、 Body 列があります。これは両パターンで共通です。

public class FooDbContext : DbContext
{
    public DbSet<Bar> Bars { get; set; }

    protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
    {
        optionsBuilder.UseNpgsql("/* connection string */");
    }
}

public class Bar
{
    [Key] public int Id { get; set; }
    public string Title { get; set; }
    public string Body { get; set; }
}

Add メソッドのパターン

internal class Program
{
    private static async Task Main(string[] args)
    {
        var bars = new List<Bar>();
        for (var i = 0; i < 1000000; i++)       // ダミーデータを生成
        {
            bars.Add(new Bar
            {
                Title = Guid.NewGuid().ToString(),
                Body = Guid.NewGuid().ToString()
            });
        }

        var context = new FooDbContext();

        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        using (var transaction = context.Database.BeginTransaction())
        {
            foreach (var bar in bars)
            {
                await context.Bars.AddAsync(bar);
            }

            await context.SaveChangesAsync();
            transaction.Commit();
        }

        sw.Stop();

        var ts = sw.Elapsed;
        Console.WriteLine($"{ts.Hours}:{ts.Minutes}:{ts.Seconds}.{ts.Milliseconds}");
    }
}

実行結果

  • 1回目
    • 0:4:12.771
  • 2回目
    • 0:4:10.858
  • 3回目
    • 0:4:11.68

4分以上かかりました。流石に遅いですね(通常このデータ量でこの方法はとらないかと思いますが)。

Binary COPY のパターン

internal class Program
{
    private static async Task Main(string[] args)
    {
        var bars = new List<Bar>();
        for (var i = 0; i < 1000000; i++)       // ダミーデータを生成
        {
            bars.Add(new Bar
            {
                Title = Guid.NewGuid().ToString(),
                Body = Guid.NewGuid().ToString()
            });
        }

        var context = new FooDbContext();

        var sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        using (var transaction = context.Database.BeginTransaction())
        {
            context.Database.ExecuteSqlCommand(
                $@"CREATE TEMP TABLE foo_temp_input(title text, body text) ON COMMIT DROP");

            var connection = (NpgsqlConnection) context.Database.GetDbConnection();

            using (var importer = connection
                .BeginBinaryImport($@"COPY foo_temp_input (title, body) FROM STDIN (FORMAT BINARY)")
            )
            {
                foreach (var bar in bars)
                {
                    importer.StartRow();
                    importer.Write(bar.Title);
                    importer.Write(bar.Body);
                }

                importer.Complete();
            }

            context.Database.ExecuteSqlCommand(
                $@"INSERT INTO ""Bars"" (""Title"", ""Body"") SELECT title, body FROM foo_temp_input");

            transaction.Commit();
        }

        sw.Stop();

        var ts = sw.Elapsed;
        Console.WriteLine($"{ts.Hours}:{ts.Minutes}:{ts.Seconds}.{ts.Milliseconds}");
    }
}

実行結果

  • 1回目
    • 0:0:13.993
  • 2回目
    • 0:0:14.760
  • 3回目
    • 0:0:14.705

14秒前後になりました。Add メソッドの場合と比べるととてもはやくなりました。

少し補足

ドキュメントを読むと Binary COPY の場合、最後に Complete() メソッドを呼ばないと writer が破棄されたタイミングでロールバックされるようです。また Cancel() メソッドで処理の取り消しが可能なようです。なので try/catch でエラー時に Cancel() メソッドを呼び出すと良さそうです(すいません未検証なのでこのような言い回しをしています……)。

……という仕様のようなので今回の Binary COPY のパターンだとトランザクションは不要です。サンプルのコードで敢えてトランザクションを使っているのはデータ更新対象のテーブルが複数あった場合の対応です。Binary COPY で一旦一時テーブルにインポートし、その後目的のテーブルに insert しています。複数テーブルに対して更新する場合この方法でトランザクションが有効になります。

アプリケーションに手軽にWebサーバを組み込む(.Net)

.NetアプリケーションにちょっとしたWeb APIを組み込みたい場合にお手軽に実装できたらなあと考えていましたが、EmbedIOというパッケージが良い感じでした。

unosquare/embedio: A tiny, cross-platform, module based web server for .NET

今回はコンソールアプリケーションに社員情報をJsonで登録/取得(Get/Post)する WebApi を実装します。

※ 実行環境として .Net Framework4.7.1 で動かしていますが、EmbedIO 自体は .Net Core にも対応しています。

EmbedIO のインストール

Visual Studioを管理者権限で起動しコンソールアプリケーションのプロジェクトを作成したら、パッケージマネージャから EmbedIO をインストールします。

PM> Install-Package EmbedIO

Employeeクラス作成

社員情報を表すEmployeeクラスを作成します。このクラスは社員情報の保持とApiのインターフェースに使います。

各プロパティの属性に JsonProperty を指定します。これは Jsonシリアライズ/デシリアライズ する際の項目名を指定します(これは Newtonsoft.Json と同じなので使った事があれば馴染み深い記法だと思います)。

Employee.cs

using Unosquare.Swan.Attributes;

namespace ConsoleApp1
{
    public class Employee
    {
        [JsonProperty("id")]
        public int Id { get; set; }

        [JsonProperty("name")]
        public string Name { get; set; }

        [JsonProperty("age")]
        public int Age { get; set; }

    }
}

EmployeeController クラス作成

Httpリクエストとレスポンスを処理するコントローラを作成します。MVCフレームワークを使ったことがあれば直感的に理解できるのではと思います。

EmployeeController.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Unosquare.Labs.EmbedIO;
using Unosquare.Labs.EmbedIO.Constants;
using Unosquare.Labs.EmbedIO.Modules;
using Unosquare.Swan;

namespace ConsoleApp1
{
    public class EmployeeController : WebApiController
    {
        public static void Setup(WebServer server)
        {
            server.RegisterModule(new WebApiModule());
            server.Module<WebApiModule>().RegisterController<EmployeeController>();
        }

        public static readonly List<Employee> Employees = new List<Employee>();

        [WebApiHandler(HttpVerbs.Get, "/api/employee/{id}")]
        public async Task<bool> GetEmployee(WebServer server, HttpListenerContext context,
            int id)
        {
            var employee = Employees.Where(v => v.Id == id);
            if (employee.Any())
            {
                context.Response.StatusCode = (int) HttpStatusCode.OK;
                return await context.JsonResponseAsync(employee); // Jsonにシリアライズして返す
            }
            else
            {
                // 対象のIdが存在しない場合は404を返す
                context.Response.StatusCode = (int) HttpStatusCode.NotFound;
                return await context.StringResponseAsync("NotFound");
            }
        }


        [WebApiHandler(HttpVerbs.Post, "/api/employee")]
        public async Task<bool> PostEmployee(WebServer server, HttpListenerContext context)
        {
            try
            {
                var employee = context.ParseJson<Employee>(); // postで受け取ったJsonのデシリアライズ
                Employees.Add(employee);

                $"『Id:{employee.Id}, name:{employee.Name}, age{employee.Age}』 が追加されました".Trace();

                context.Response.StatusCode = (int) HttpStatusCode.Created;
                return await context.StringResponseAsync(string.Empty);
            }
            catch (Exception ex)
            {
                return await context.JsonExceptionResponseAsync(ex);
            }
        }
    }
}

Webサーバの起動

エントリポイントにWebサーバの起動の記述を追記します。下記コードでは 9000 ポートを利用するように設定をしています。

Program.cs

using System.Threading.Tasks;
using Unosquare.Labs.EmbedIO;
using Unosquare.Labs.EmbedIO.Constants;

namespace ConsoleApp1
{
    internal class Program
    {
        private  static async Task Main(string[] args)
        {
            var url = $@"http://localhost:9000";    // リモートアクセスを許可する場合は http://*:9000 

            using (var server = new WebServer(url, RoutingStrategy.Regex))
            {
                EmployeeController.Setup(server);
                await server.RunAsync();
            }
        }
    }
}

これで実装は完了です。

動かしてみる

今回はコンソールアプリケーションなので、デバッグモードで実行すると次のようなコンソールが起動します(正常に起動しない場合は Visual Studio を管理者権限で起動していない可能性があるので、管理者権限で起動しなおします)。

f:id:mou_kat:20180528210635p:plain

IDを100 で登録してみます。

f:id:mou_kat:20180528214226p:plain

f:id:mou_kat:20180528214347p:plain

上記で登録したID(100)でデータを取得してみます。

f:id:mou_kat:20180528214542p:plain

無事取得できました。

Unosquare.Swan について

本題から逸れますが、コントローラの cs $"『Id:{employee.Id}, name:{employee.Name}, age{employee.Age}』 が追加されました".Trace(); (.Trace())部分について補足です。

これは EmbedIO インストール時に依存ライブラリとして一緒にインストールされる Unosquare.Swan.Lite に stringの拡張メソッドとして定義されています。 このメソッドを使い分けることでコンソールに表示させるメッセージタイプを制御できます。メッセージの種類は swan/Terminal.Enums.cs at master · unosquare/swanLogMessageType として定義されています。

コンソールに出力するメッセージタイプを指定したい場合以下のように指定します。

// エラーと警告のみ表示
Terminal.Settings.DisplayLoggingMessageType =
                | LogMessageType.Error
                | LogMessageType.Warning;

NUC を買ったずい…

windowsにインストールしているdockerがあまり安定せず大変不便なので、しかたなくコンテナ用に intel nuc を買ってしまいました…。 4年くらい前に初代の nuc を買ったので2台目です。

今回は予定になかった出費なのと常時稼働するので、値段と消費電力を考えて Celeron モデルにしました。 メモリは8GBまでしか積めないけど、物理4コアなのでコンテナ用にちょうど良いかなと。 あとはNASとオブジェクトストレージも欲しいと思っていたのでその役割も担ってもらおうとHDDとミラーリング機能があるUSB3.1対応のHDDケースも一緒に買いました。

OSは CentOS7 の最小構成に sambaとdockerをインストールして、あとはオブジェクトストレージとして minio かな…。

うーん、PC欲しかったのにまた遠のいてしまったような…。

f:id:mou_kat:20170902141325j:plain

Raspberry Pi 3 を買った

ただ、電子工作スキルが無いので色々覚える事があった。オームの法則とかは中学で習ったような気はする…。

まあそんなこんなでLEDとボタンをつけて、ボタンを押したらLEDを on - off みたいなので出来た。知りたかった Windows 10 IoT Core からGPIOポートへのアクセス方法がわかったのでとりあえずは良いかなあ(そもそもGPIOが何かもよくわかっていなかったのでその辺りの理解も)。

時間があるときにもう少し触ってみる。

f:id:mou_kat:20170816221518j:plain

今回のSteamのサマーセール買ったもの

金額はセール時のものです。

  • Hacknet(¥250)
    • 結構前から気になっていたのでセールを機に買いました。まだ遊んでいない…。
  • 911 Operator (¥866)
    • 911って日本で言うところの110番と119番が一緒になったような番号らしい。ゲーム内容は911のオペレータになって適切な緊急車両を派遣したりしなかったり(いたずら電話とかあるので)、応急処置の指示をだしたり。緊張感あってそこそこ楽しめた。
  • Lifeline(¥99)
    • どこかの衛星に不時着した宇宙船の生存者からの通信を受信したという設定のゲーム。通信はテキストで送られてきて、たまにアドバイス(選択方式)をする事でストーリー分岐。先の展開がとても気になったし金額分以上に楽しめた(いや99円だったけどw)。

今回のセールでは以上です。なんだか低価格なものばかりになってしまった。 Dying Light とかセールの度に買おうか迷うんだけど、PCのスペックが不安なのでPCを新調できたら買おうかな…。