昨日の下記記事の続き。 next config がうまくいくかどうか確認していなかったのでそのあたりを確認。
親知らずの日記: [Embulk] Embulk で Salesforce から PostgreSQL にデータを転送してみた
startRowMarkerName を追加。 更新するたびに増えていく値であれば何でもよい。 とはいえ一番簡単なのは最終更新日であろう。
in:
type: salesforce_bulk
userName: USERNAME
password: PASSWORD+TOKEN
authEndpointUrl: https://login.salesforce.com/services/Soap/u/34.0
objectType: Account
pollingIntervalMillisecond: 2500
querySelectFrom: SELECT Id,Name,LastModifiedDate FROM Account
columns:
- {type: string, name: Id}
- {type: string, name: Name}
- {type: timestamp, name: LastModifiedDate, format: '%FT%T.%L%Z'}
startRowMarkerName: LastModifiedDate
out: {type: postgresql, host: 192.168.11.207, user: from_sfdc, password: testing,
database: from_sfdc, table: account_t, mode: merge}
$ ./bin/embulk run -L ../embulk-input-salesforce_bulk/ ./sfdc_postgresql.yml -o ./sfdc_postgresql.yml
2015-08-24 22:39:52.733 +0900: Embulk v0.7.3
2015-08-24 22:39:54.269 +0900 [INFO] (transaction): Loaded plugin embulk-input-salesforce_bulk (0.1.0)
2015-08-24 22:39:54.309 +0900 [INFO] (transaction): Loaded plugin embulk-output-postgresql (0.4.1)
2015-08-24 22:39:54.372 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:39:54.425 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:39:54.427 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:54.427 +0900 [INFO] (transaction): Using merge mode
2015-08-24 22:39:54.446 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_aa0b9f76c0_bl_tmp000"
2015-08-24 22:39:54.447 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:54.455 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS "account_t_aa0b9f76c0_bl_tmp000" ("Id" VARCHAR(20), "Name" VARCHAR(100), "LastModifiedDate" TIMESTAMP)
2015-08-24 22:39:54.457 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:54.468 +0900 [INFO] (transaction): Using merge keys: [Id]
2015-08-24 22:39:54.488 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-08-24 22:39:54.511 +0900 [INFO] (task-0000): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:39:54.521 +0900 [INFO] (task-0000): SQL: SET search_path TO "public"
2015-08-24 22:39:54.522 +0900 [INFO] (task-0000): > 0.00 seconds
2015-08-24 22:39:54.523 +0900 [INFO] (task-0000): Copy SQL: COPY "account_t_aa0b9f76c0_bl_tmp000" ("Id", "Name", "LastModifiedDate") FROM STDIN
2015-08-24 22:39:54.525 +0900 [WARN] (task-0000): An output plugin is compiled with old Embulk plugin API. Please update the plugin version using "embulk gem install" command, or contact a developer of the plugin to upgrade the plugin code using "embulk migrate" command: class org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput
2015-08-24 22:39:54.548 +0900 [INFO] (task-0000): Try login to 'https://login.salesforce.com/services/Soap/u/34.0'.
2015-08-24 22:39:55.016 +0900 [INFO] (task-0000): Login success.
2015-08-24 22:39:55.017 +0900 [INFO] (task-0000): Send request : 'SELECT Id,Name,LastModifiedDate FROM Account'
2015-08-24 22:39:58.396 +0900 [INFO] (task-0000): Loading 30 rows (1,903 bytes)
2015-08-24 22:39:58.399 +0900 [INFO] (task-0000): > 0.00 seconds (loaded 30 rows in total)
2015-08-24 22:39:58.524 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-08-24 22:39:58.525 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2015-08-24 22:39:58.719 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:39:58.720 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:58.720 +0900 [INFO] (transaction): SQL: WITH updated AS (UPDATE "account_t" SET "Id" = S."Id", "Name" = S."Name", "LastModifiedDate" = S."LastModifiedDate" FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_aa0b9f76c0_bl_tmp000") S WHERE "account_t"."Id" = S."Id" RETURNING S."Id") INSERT INTO "account_t" ("Id", "Name", "LastModifiedDate") SELECT DISTINCT ON ("Id") * FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_aa0b9f76c0_bl_tmp000") S WHERE NOT EXISTS (SELECT 1 FROM updated WHERE S."Id" = updated."Id")
2015-08-24 22:39:58.723 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:58.729 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:39:58.735 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:39:58.736 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:58.736 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_aa0b9f76c0_bl_tmp000"
2015-08-24 22:39:58.738 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:39:58.740 +0900 [INFO] (main): Committed.
2015-08-24 22:39:58.741 +0900 [INFO] (main): Next config diff: {"in":{"start_row_marker":"2015-08-24T13:31:58.000Z"},"out":{}}
ちゃんと 30 行ロードされたようだ。
2015-08-24 22:39:58.396 +0900 [INFO] (task-0000): Loading 30 rows (1,903 bytes)
2015-08-24 22:39:58.399 +0900 [INFO] (task-0000): > 0.00 seconds (loaded 30 rows in total)
そして更新された設定ファイルがこちら。
in:
type: salesforce_bulk
userName: USERNAME
password: PASSWORD+TOKEN
authEndpointUrl: https://login.salesforce.com/services/Soap/u/34.0
objectType: Account
pollingIntervalMillisecond: 2500
querySelectFrom: SELECT Id,Name,LastModifiedDate FROM Account
columns:
- {type: string, name: Id}
- {type: string, name: Name}
- {type: timestamp, name: LastModifiedDate, format: '%FT%T.%L%Z'}
startRowMarkerName: LastModifiedDate
start_row_marker: '2015-08-24T13:31:58.000Z'
out: {type: postgresql, host: 192.168.11.207, user: from_sfdc, password: testing,
database: from_sfdc, table: account_t, mode: merge}
Salesforce 側を何も更新せずに実行。
$ ./bin/embulk run -L ../embulk-input-salesforce_bulk/ ./sfdc_postgresql.yml -o ./sfdc_postgresql.yml
2015-08-24 22:42:51.365 +0900: Embulk v0.7.3
2015-08-24 22:42:52.903 +0900 [INFO] (transaction): Loaded plugin embulk-input-salesforce_bulk (0.1.0)
2015-08-24 22:42:52.930 +0900 [INFO] (transaction): Loaded plugin embulk-output-postgresql (0.4.1)
2015-08-24 22:42:52.991 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:42:53.040 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:42:53.043 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:42:53.043 +0900 [INFO] (transaction): Using merge mode
2015-08-24 22:42:53.072 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_5c31b5d480_bl_tmp000"
2015-08-24 22:42:53.074 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:42:53.081 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS "account_t_5c31b5d480_bl_tmp000" ("Id" VARCHAR(20), "Name" VARCHAR(100), "LastModifiedDate" TIMESTAMP)
2015-08-24 22:42:53.104 +0900 [INFO] (transaction): > 0.02 seconds
2015-08-24 22:42:53.121 +0900 [INFO] (transaction): Using merge keys: [Id]
2015-08-24 22:42:53.145 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-08-24 22:42:53.170 +0900 [INFO] (task-0000): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:42:53.179 +0900 [INFO] (task-0000): SQL: SET search_path TO "public"
2015-08-24 22:42:53.180 +0900 [INFO] (task-0000): > 0.00 seconds
2015-08-24 22:42:53.181 +0900 [INFO] (task-0000): Copy SQL: COPY "account_t_5c31b5d480_bl_tmp000" ("Id", "Name", "LastModifiedDate") FROM STDIN
2015-08-24 22:42:53.183 +0900 [WARN] (task-0000): An output plugin is compiled with old Embulk plugin API. Please update the plugin version using "embulk gem install" command, or contact a developer of the plugin to upgrade the plugin code using "embulk migrate" command: class org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput
2015-08-24 22:42:53.203 +0900 [INFO] (task-0000): Try login to 'https://login.salesforce.com/services/Soap/u/34.0'.
2015-08-24 22:42:53.653 +0900 [INFO] (task-0000): Login success.
2015-08-24 22:42:53.654 +0900 [INFO] (task-0000): Send request : 'SELECT Id,Name,LastModifiedDate FROM Account WHERE LastModifiedDate > 2015-08-24T13:31:58.000Z'
2015-08-24 22:42:57.647 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-08-24 22:42:57.648 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2015-08-24 22:42:57.651 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:42:57.652 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:42:57.652 +0900 [INFO] (transaction): SQL: WITH updated AS (UPDATE "account_t" SET "Id" = S."Id", "Name" = S."Name", "LastModifiedDate" = S."LastModifiedDate" FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_5c31b5d480_bl_tmp000") S WHERE "account_t"."Id" = S."Id" RETURNING S."Id") INSERT INTO "account_t" ("Id", "Name", "LastModifiedDate") SELECT DISTINCT ON ("Id") * FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_5c31b5d480_bl_tmp000") S WHERE NOT EXISTS (SELECT 1 FROM updated WHERE S."Id" = updated."Id")
2015-08-24 22:42:57.654 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:42:57.660 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:42:57.668 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:42:57.669 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:42:57.669 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_5c31b5d480_bl_tmp000"
2015-08-24 22:42:57.672 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:42:57.675 +0900 [INFO] (main): Committed.
2015-08-24 22:42:57.676 +0900 [INFO] (main): Next config diff: {"in":{"start_row_marker":"2015-08-24T13:31:58.000Z"},"out":{}}
「Loading XXX rows」がない。何も読み込まれなかったようだ。
取引先を一つ修正してから実行。
$ ./bin/embulk run -L ../embulk-input-salesforce_bulk/ ./sfdc_postgresql.yml -o ./sfdc_postgresql.yml
2015-08-24 22:45:26.724 +0900: Embulk v0.7.3
2015-08-24 22:45:28.289 +0900 [INFO] (transaction): Loaded plugin embulk-input-salesforce_bulk (0.1.0)
2015-08-24 22:45:28.321 +0900 [INFO] (transaction): Loaded plugin embulk-output-postgresql (0.4.1)
2015-08-24 22:45:28.392 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:45:28.448 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:45:28.450 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:45:28.450 +0900 [INFO] (transaction): Using merge mode
2015-08-24 22:45:28.468 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_f80c474f80_bl_tmp000"
2015-08-24 22:45:28.470 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:45:28.476 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS "account_t_f80c474f80_bl_tmp000" ("Id" VARCHAR(20), "Name" VARCHAR(100), "LastModifiedDate" TIMESTAMP)
2015-08-24 22:45:28.477 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:45:28.523 +0900 [INFO] (transaction): Using merge keys: [Id]
2015-08-24 22:45:28.542 +0900 [INFO] (transaction): {done: 0 / 1, running: 0}
2015-08-24 22:45:28.561 +0900 [INFO] (task-0000): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:45:28.569 +0900 [INFO] (task-0000): SQL: SET search_path TO "public"
2015-08-24 22:45:28.570 +0900 [INFO] (task-0000): > 0.00 seconds
2015-08-24 22:45:28.570 +0900 [INFO] (task-0000): Copy SQL: COPY "account_t_f80c474f80_bl_tmp000" ("Id", "Name", "LastModifiedDate") FROM STDIN
2015-08-24 22:45:28.572 +0900 [WARN] (task-0000): An output plugin is compiled with old Embulk plugin API. Please update the plugin version using "embulk gem install" command, or contact a developer of the plugin to upgrade the plugin code using "embulk migrate" command: class org.embulk.output.jdbc.AbstractJdbcOutputPlugin$PluginPageOutput
2015-08-24 22:45:28.587 +0900 [INFO] (task-0000): Try login to 'https://login.salesforce.com/services/Soap/u/34.0'.
2015-08-24 22:45:29.355 +0900 [INFO] (task-0000): Login success.
2015-08-24 22:45:29.355 +0900 [INFO] (task-0000): Send request : 'SELECT Id,Name,LastModifiedDate FROM Account WHERE LastModifiedDate > 2015-08-24T13:31:58.000Z'
2015-08-24 22:45:32.773 +0900 [INFO] (task-0000): Loading 1 rows (57 bytes)
2015-08-24 22:45:32.777 +0900 [INFO] (task-0000): > 0.00 seconds (loaded 1 rows in total)
2015-08-24 22:45:32.958 +0900 [INFO] (transaction): {done: 1 / 1, running: 0}
2015-08-24 22:45:32.959 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=28800}
2015-08-24 22:45:32.964 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:45:32.965 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:45:32.965 +0900 [INFO] (transaction): SQL: WITH updated AS (UPDATE "account_t" SET "Id" = S."Id", "Name" = S."Name", "LastModifiedDate" = S."LastModifiedDate" FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_f80c474f80_bl_tmp000") S WHERE "account_t"."Id" = S."Id" RETURNING S."Id") INSERT INTO "account_t" ("Id", "Name", "LastModifiedDate") SELECT DISTINCT ON ("Id") * FROM (SELECT "Id", "Name", "LastModifiedDate" FROM "account_t_f80c474f80_bl_tmp000") S WHERE NOT EXISTS (SELECT 1 FROM updated WHERE S."Id" = updated."Id")
2015-08-24 22:45:32.967 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:45:32.973 +0900 [INFO] (transaction): Connecting to jdbc:postgresql://192.168.11.207:5432/from_sfdc options {user=from_sfdc, tcpKeepAlive=true, loginTimeout=300, socketTimeout=1800}
2015-08-24 22:45:32.978 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-08-24 22:45:32.979 +0900 [INFO] (transaction): > 0.00 seconds
2015-08-24 22:45:32.980 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS "account_t_f80c474f80_bl_tmp000"
2015-08-24 22:45:32.993 +0900 [INFO] (transaction): > 0.01 seconds
2015-08-24 22:45:32.997 +0900 [INFO] (main): Committed.
2015-08-24 22:45:32.997 +0900 [INFO] (main): Next config diff: {"in":{"start_row_marker":"2015-08-24T13:45:28.000Z"},"out":{}}
うむ、良いのではないだろうか。
PostgreSQL のカラムが更新されていることも確認。embulk-input-postgresql の mode も merge で良いようだ。