使用するプラグイン
- mikoto2000/embulk-input-salesforce_bulk
- embulk-output-jdbc/embulk-output-postgresql at master · embulk/embulk-output-jdbc
salesforce_bulk は、自作プラグイン。設定方法などまだまだいろいろ変わりそうなので gem 化は考えていない。
Embulk に embulk-output-postgresql を入れる
$ ./bin/embulk gem install embulk-output-postgresql
2015-08-22 22:24:01.986 +0900: Embulk v0.7.2
Successfully installed embulk-output-postgresql-0.4.1
1 gem installed
PostgreSQL を準備する
VirtualBox に Debian minimal をインストール
...(略)
Debian に PostgreSQL 環境を構築
aptitude update
aptitude full-upgrade
aptitude install postgresql
su - postgres
psql
create database from_sfdc;
\c from_sfdc
create role from_sfdc with login;
alter user from_sfdc with password 'testing';
create table account_t("Id" varchar(20) primary key, "Name" varchar(100), "LastModifiedDate" timestamp);
exit
echo "listen_addresses = '*'" >> /etc/postgresql/9.4/main/postgres.conf
echo "host from_sfdc from_sfdc xxx.xxx.xxx.xxx/xx md5" >> /etc/postgresql/9.4/main/pg_hba.conf
service postgresql restart
Embulk 設定ファイルを作成
in:
type: salesforce_bulk
userName: USERNAME
password: PASSWORD+TOKEN
authEndpointUrl: https://login.salesforce.com/services/Soap/u/34.0
objectType: Account
pollingIntervalMillisecond: 2500
querySelectFrom: SELECT Id,Name,LastModifiedDate FROM Account
columns:
- {type: string, name: Id}
- {type: string, name: Name}
- {type: timestamp, name: LastModifiedDate, format: '%FT%T.%L%Z'}
out:
type: postgresql
host: 192.168.11.207
user: from_sfdc
password: "testing"
database: from_sfdc
table: account_t
mode: merge
実行
$ ./bin/embulk run -L ../embulk-input-salesforce_bulk/ ./sfdc_postgresql.yml
2015-08-23 03:08:56.728 +0900: Embulk v0.7.2
2015-08-23 03:08:58.254 +0900 [INFO] (transaction): Loaded plugin embulk-input-salesforce_bulk (0.1.0)
2015-08-23 03:08:58.289 +0900 [INFO] (transaction): Loaded plugin embulk-output-postgresql (0.4.1)
2015-08-23 03:08:58.356 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-23 03:08:58.406 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-23 03:08:58.408 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:08:58.408 +0900 [INFO] (transaction): Using merge mode
2015-08-23 03:08:58.427 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_ba0b162280_bl_tmp000"
2015-08-23 03:08:58.429 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:08:58.435 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS "account_t_ba0b162280_bl_tmp000" ("Id" VARCHAR(20), "Name" VARCHAR(100), "LastModifiedDate" TIMESTAMP)
2015-08-23 03:08:58.436 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:08:58.466 +0900 [INFO] (transaction): Using merge keys: [Id]
2015-08-23 03:08:58.482 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-08-23 03:08:58.500 +0900 [INFO] (task-0000): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-23 03:08:58.506 +0900 [INFO] (task-0000): SQL: SET search_path TO "public"
2015-08-23 03:08:58.507 +0900 [INFO] (task-0000): > 0.00 seconds
2015-08-23 03:08:58.508 +0900 [INFO] (task-0000): Copy SQL: COPY "account_t_ba0b162280_bl_tmp000" ("Id", "Name", "LastModifiedDate") FROM STDIN
2015-08-23 03:08:58.509 +0900 [WARN] (task-0000): An output plugin is compiled with old Embulk plugin API. Please update the plugin version using "embulk gem install" command, or contact a developer of the plugin to upgrade the plugin code using "embulk migrate" command: class org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput
2015-08-23 03:08:58.524 +0900 [INFO] (task-0000): Try login to 'https://login.salesforce.com/services/Soap/u/34.0'.
2015-08-23 03:08:58.960 +0900 [INFO] (task-0000): Login success.
2015-08-23 03:08:58.960 +0900 [INFO] (task-0000): Send request : 'SELECT Id,Name,LastModifiedDate FROM Account'
2015-08-23 03:09:02.660 +0900 [INFO] (task-0000): Loading 30 rows (1,903 bytes)
2015-08-23 03:09:02.663 +0900 [INFO] (task-0000): > 0.00 seconds (loaded 30 rows in total)
2015-08-23 03:09:02.730 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-08-23 03:09:02.731 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2015-08-23 03:09:02.735 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-23 03:09:02.736 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:09:02.736 +0900 [INFO] (transaction): SQL: WITH updated AS (UPDATE "account_t" SET "Id" = S."Id", "Name" = S."Name", "LastModifiedDate" = S."LastModifiedDate" FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_ba0b162280_bl_tmp000") S WHERE "account_t"."Id" = S."Id" RETURNING S."Id") INSERT INTO "account_t" ("Id", "Name", "LastModifiedDate") SELECT DISTINCT ON ("Id") * FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_ba0b162280_bl_tmp000") S WHERE NOT EXISTS (SELECT 1 FROM updated WHERE S."Id" = updated."Id")
2015-08-23 03:09:02.739 +0900 [INFO] (transaction): > 0.00 seconds (30 rows)
2015-08-23 03:09:02.746 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-23 03:09:02.751 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-23 03:09:02.752 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:09:02.752 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_ba0b162280_bl_tmp000"
2015-08-23 03:09:02.755 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-23 03:09:02.759 +0900 [INFO] (main): Committed.
2015-08-23 03:09:02.759 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
無事、PostgreSQL に格納された。よかった。
こんな感じのこともありました。『中の処理がわからんとシンタックスエラーっていわれても...』ってなって割と苦戦してました。
embulk-output-postgresql, 「Error: java.lang.RuntimeException: org.postgresql.util.PSQLException: ERROR: syntax error at or near ")"」って怒られるな。
— 大雪 命 (@mikoto2000) 2015, 8月 22
@mikoto2000 カラムをダブルクォーテーションで囲み忘れてて全部小文字になってた -> Schema のカラム名と食い違って無視される -> SQL 文のカラム定義が空になる -> Syntax error。のようだ。
— 大雪 命 (@mikoto2000) 2015, 8月 22
0 件のコメント:
コメントを投稿