2015/08/23

[Embulk] Embulk で Salesforce から PostgreSQL にデータを転送してみた

使用するプラグイン

salesforce_bulk は、自作プラグイン。設定方法などまだまだいろいろ変わりそうなので gem 化は考えていない。

Embulk に embulk-output-postgresql を入れる

$ ./bin/embulk gem install embulk-output-postgresql
2015-08-22 22:24:01.986 +0900: Embulk v0.7.2
Successfully installed embulk-output-postgresql-0.4.1
1 gem installed

PostgreSQL を準備する

VirtualBox に Debian minimal をインストール

...(略)

Debian に PostgreSQL 環境を構築

aptitude update
aptitude full-upgrade
aptitude install postgresql

su - postgres
psql

create database from_sfdc;
\c from_sfdc
create role from_sfdc with login;
alter user from_sfdc with password 'testing';

create table account_t("Id" varchar(20) primary key, "Name" varchar(100), "LastModifiedDate" timestamp);

exit
echo "listen_addresses = '*'" >> /etc/postgresql/9.4/main/postgres.conf
echo "host from_sfdc from_sfdc xxx.xxx.xxx.xxx/xx md5" >> /etc/postgresql/9.4/main/pg_hba.conf
service postgresql restart

Embulk 設定ファイルを作成

in:
  type: salesforce_bulk
  userName: USERNAME
  password: PASSWORD+TOKEN
  authEndpointUrl: https://login.salesforce.com/services/Soap/u/34.0
  objectType: Account
  pollingIntervalMillisecond: 2500
  querySelectFrom: SELECT Id,Name,LastModifiedDate FROM Account
  columns:
  - {type: string, name: Id}
  - {type: string, name: Name}
  - {type: timestamp, name: LastModifiedDate, format: '%FT%T.%L%Z'}
out:
  type: postgresql
  host: 192.168.11.207
  user: from_sfdc
  password: "testing"
  database: from_sfdc
  table: account_t
  mode: merge

実行

$ ./bin/embulk run -L ../embulk-input-salesforce_bulk/ ./sfdc_postgresql.yml
2015-08-23 03:08:56.728 +0900: Embulk v0.7.2
2015-08-23 03:08:58.254 +0900 [INFO] (transaction): Loaded plugin embulk-input-salesforce_bulk (0.1.0)
2015-08-23 03:08:58.289 +0900 [INFO] (transaction): Loaded plugin embulk-output-postgresql (0.4.1)
2015-08-23 03:08:58.356 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-23 03:08:58.406 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-23 03:08:58.408 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:08:58.408 +0900 [INFO] (transaction): Using merge mode
2015-08-23 03:08:58.427 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_ba0b162280_bl_tmp000"
2015-08-23 03:08:58.429 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:08:58.435 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS "account_t_ba0b162280_bl_tmp000" ("Id" VARCHAR(20), "Name" VARCHAR(100), "LastModifiedDate" TIMESTAMP)
2015-08-23 03:08:58.436 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:08:58.466 +0900 [INFO] (transaction): Using merge keys: [Id]
2015-08-23 03:08:58.482 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-08-23 03:08:58.500 +0900 [INFO] (task-0000): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-23 03:08:58.506 +0900 [INFO] (task-0000): SQL: SET search_path TO "public"
2015-08-23 03:08:58.507 +0900 [INFO] (task-0000): > 0.00 seconds
2015-08-23 03:08:58.508 +0900 [INFO] (task-0000): Copy SQL: COPY "account_t_ba0b162280_bl_tmp000" ("Id", "Name", "LastModifiedDate") FROM STDIN
2015-08-23 03:08:58.509 +0900 [WARN] (task-0000): An output plugin is compiled with old Embulk plugin API. Please update the plugin version using "embulk gem install" command, or contact a developer of the plugin to upgrade the plugin code using "embulk migrate" command: class org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput
2015-08-23 03:08:58.524 +0900 [INFO] (task-0000): Try login to 'https://login.salesforce.com/services/Soap/u/34.0'.
2015-08-23 03:08:58.960 +0900 [INFO] (task-0000): Login success.
2015-08-23 03:08:58.960 +0900 [INFO] (task-0000): Send request : 'SELECT Id,Name,LastModifiedDate FROM Account'
2015-08-23 03:09:02.660 +0900 [INFO] (task-0000): Loading 30 rows (1,903 bytes)
2015-08-23 03:09:02.663 +0900 [INFO] (task-0000): > 0.00 seconds (loaded 30 rows in total)
2015-08-23 03:09:02.730 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-08-23 03:09:02.731 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2015-08-23 03:09:02.735 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-23 03:09:02.736 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:09:02.736 +0900 [INFO] (transaction): SQL: WITH updated AS (UPDATE "account_t" SET "Id" = S."Id", "Name" = S."Name", "LastModifiedDate" = S."LastModifiedDate" FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_ba0b162280_bl_tmp000") S WHERE "account_t"."Id" = S."Id" RETURNING S."Id") INSERT INTO "account_t" ("Id", "Name", "LastModifiedDate") SELECT DISTINCT ON ("Id") * FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_ba0b162280_bl_tmp000") S WHERE NOT EXISTS (SELECT 1 FROM updated WHERE S."Id" = updated."Id")
2015-08-23 03:09:02.739 +0900 [INFO] (transaction): > 0.00 seconds (30 rows)
2015-08-23 03:09:02.746 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-23 03:09:02.751 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-23 03:09:02.752 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:09:02.752 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_ba0b162280_bl_tmp000"
2015-08-23 03:09:02.755 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:09:02.759 +0900 [INFO] (main): Committed.
2015-08-23 03:09:02.759 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

無事、PostgreSQL に格納された。よかった。

こんな感じのこともありました。『中の処理がわからんとシンタックスエラーっていわれても...』ってなって割と苦戦してました。

0 件のコメント: