合 使用OGG传统模式将Oracle同步到kafka(全量+增量)
Tags: OracleOGG实时同步Kafka异构迁移数据同步
环境准备
Oracle环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | -- 创建专用网络 docker network create --subnet=172.72.7.0/24 ora-network -- oracle 压测工具 docker pull lhrbest/lhrdbbench:1.0 docker rm -f lhrdbbench docker run -d --name lhrdbbench -h lhrdbbench \ --net=ora-network --ip 172.72.7.33 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/lhrdbbench:1.0 \ /usr/sbin/init -- Oracle 12c docker rm -f lhrora1221 docker run -itd --name lhrora1221 -h lhrora1221 \ --net=ora-network --ip 172.72.7.34 \ -p 1526:1521 -p 3396:3389 \ --privileged=true \ lhrbest/oracle_12cr2_ee_lhr_12.2.0.1:2.0 init -- oracle数据库配置 1.开启数据库归档--如果没有开启 2.开启数据库级别附加日志--如果没有开始最小附加日志 3.开启强制日志--如果没有开启强制日志 4.设置ENABLE_GOLDENGATE_REPLICAT参数为TRUE 5.创建OGG用户包括包括源端用户、目标端用户以及OGG抽取用户 alter database add supplemental log data; alter database add supplemental log data (all) columns; alter database force logging; alter system set enable_goldengate_replication=TRUE; select name,supplemental_log_data_min , force_logging, log_mode from v$database; alter system set streams_pool_size = 128M; alter system set sga_max_size = 2g scope=spfile; alter system set sga_target = 2g scope=spfile; alter system set pga_aggregate_target=1g; startup force -- OGG管理用户 CREATE USER ogg identified by lhr; GRANT DBA to ogg; grant SELECT ANY DICTIONARY to ogg; GRANT EXECUTE ON SYS.DBMS_LOCK TO ogg; grant select any transaction to ogg; grant select any table to ogg; grant flashback any table to ogg; grant alter any table to ogg; exec dbms_goldengate_auth.grant_admin_privilege('OGG','*',TRUE); -- 业务用户 CREATE USER lhr identified by lhr; alter user lhr identified by lhr; GRANT DBA to lhr ; grant SELECT ANY DICTIONARY to lhr; GRANT EXECUTE ON SYS.DBMS_LOCK TO lhr; -- 启动监听 vi /u01/app/oracle/product/12.2.0.1/dbhome_1/network/admin/listener.ora lsnrctl start lsnrctl status |
Oracle数据初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | -- 源端数据初始化 /usr/local/swingbench/bin/oewizard -s -create -c /usr/local/swingbench/wizardconfigs/oewizard.xml -create \ -version 2.0 -cs //172.72.7.34/lhrsdb -dba "sys as sysdba" -dbap lhr -dt thin \ -ts users -u lhr -p lhr -allindexes -scale 0.0001 -tc 16 -v -cl col TABLE_NAME format a30 SELECT a.table_name,a.num_rows FROM dba_tables a where a.OWNER='LHR' ; select object_type,count(*) from dba_objects where owner='LHR' group by object_type; select object_type,status,count(*) from dba_objects where owner='LHR' group by object_type,status; select sum(bytes)/1024/1024 from dba_segments where owner='LHR'; -- 检查键是否正确:https://www.dbaup.com/ogg-01296-biaoyouzhujianhuoweiyijiandanshirengranshiyongquanbulielaijiexixing.html -- 否则OGG启动后,会报错:OGG-01296、OGG-06439、OGG-01169 Encountered an update where all key columns for target table LHR.ORDER_ITEMS are not present. select owner, constraint_name, constraint_type, status, validated from dba_constraints where owner='LHR' and VALIDATED='NOT VALIDATED'; select 'alter table lhr.'||TABLE_NAME||' enable validate constraint '||CONSTRAINT_NAME||';' from dba_constraints where owner='LHR' and VALIDATED='NOT VALIDATED'; -- 删除外键 SELECT 'ALTER TABLE LHR.'|| D.TABLE_NAME ||' DROP constraint '|| D.CONSTRAINT_NAME||';' FROM DBA_constraints d where owner='LHR' and d.CONSTRAINT_TYPE='R'; sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb @/oggoracle/demo_ora_create.sql @/oggoracle/demo_ora_insert.sql SQL> select * from tcustmer; CUST NAME CITY ST ---- ------------------------------ -------------------- -- WILL BG SOFTWARE CO. SEATTLE WA JANE ROCKY FLYER INC. DENVER CO -- 创建2个clob和blob类型的表 sqlplus lhr/lhr@172.72.7.34:1521/lhrsdb @/oggoracle/demo_ora_lob_create.sql exec testing_lobs; select * from lhr.TSRSLOB; drop table IMAGE_LOB; CREATE TABLE IMAGE_LOB ( T_ID VARCHAR2 (5) NOT NULL, T_IMAGE BLOB, T_CLOB CLOB ); -- 插入blob文件 CREATE OR REPLACE DIRECTORY D1 AS '/home/oracle/'; grant all on DIRECTORY D1 TO PUBLIC; CREATE OR REPLACE NONEDITIONABLE PROCEDURE IMG_INSERT(TID VARCHAR2, FILENAME VARCHAR2, name VARCHAR2) AS F_LOB BFILE; B_LOB BLOB; BEGIN INSERT INTO IMAGE_LOB (T_ID, T_IMAGE,T_CLOB) VALUES (TID, EMPTY_BLOB(),name) RETURN T_IMAGE INTO B_LOB; F_LOB := BFILENAME('D1', FILENAME); DBMS_LOB.FILEOPEN(F_LOB, DBMS_LOB.FILE_READONLY); DBMS_LOB.LOADFROMFILE(B_LOB, F_LOB, DBMS_LOB.GETLENGTH(F_LOB)); DBMS_LOB.FILECLOSE(F_LOB); COMMIT; END; / BEGIN IMG_INSERT('1','1.jpg','dbaup.com'); IMG_INSERT('2','2.jpg','www.dbaup.com'); END; / select * from IMAGE_LOB; ----- oracle所有表 SQL> select * from tab; TNAME TABTYPE CLUSTERID ------------------------------ ------- ---------- ADDRESSES TABLE CARD_DETAILS TABLE CUSTOMERS TABLE IMAGE_LOB TABLE INVENTORIES TABLE LOGON TABLE ORDERENTRY_METADATA TABLE ORDERS TABLE ORDER_ITEMS TABLE PRODUCTS VIEW PRODUCT_DESCRIPTIONS TABLE PRODUCT_INFORMATION TABLE PRODUCT_PRICES VIEW TCUSTMER TABLE TCUSTORD TABLE TSRSLOB TABLE TTRGVAR TABLE WAREHOUSES TABLE 18 rows selected. SELECT COUNT(*) FROM LHR.ADDRESSES UNION ALL SELECT COUNT(*) FROM LHR.CARD_DETAILS UNION ALL SELECT COUNT(*) FROM LHR.CUSTOMERS UNION ALL SELECT COUNT(*) FROM LHR.IMAGE_LOB UNION ALL SELECT COUNT(*) FROM LHR.INVENTORIES UNION ALL SELECT COUNT(*) FROM LHR.LOGON UNION ALL SELECT COUNT(*) FROM LHR.ORDERENTRY_METADATA UNION ALL SELECT COUNT(*) FROM LHR.ORDERS UNION ALL SELECT COUNT(*) FROM LHR.ORDER_ITEMS UNION ALL SELECT COUNT(*) FROM LHR.PRODUCT_DESCRIPTIONS UNION ALL SELECT COUNT(*) FROM LHR.PRODUCT_INFORMATION UNION ALL SELECT COUNT(*) FROM LHR.TCUSTMER UNION ALL SELECT COUNT(*) FROM LHR.TCUSTORD UNION ALL SELECT COUNT(*) FROM LHR.TSRSLOB UNION ALL SELECT COUNT(*) FROM LHR.TTRGVAR UNION ALL SELECT COUNT(*) FROM LHR.WAREHOUSES ; COUNT(*) ---------- 150 150 100 2 900724 239 4 143 773 1000 1000 2 2 1 0 1000 16 rows selected. |
最终,在Oracle端共包括16张表,2个视图,其中2个表TSRSLOB和IMAGE_LOB包括了blob和clob字段。
目标端kafka环境
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | docker pull lhrbest/kafka:3.2.0 docker rm -f lhrkafka docker run -itd --name lhrkafka -h lhrkafka \ --net=ora-network --ip 172.72.7.44 \ -p 9092:9092 -p 2181:2181 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/kafka:3.2.0 \ /usr/sbin/init docker exec -it lhrkafka bash -- 启动(默认已启动) /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & [root@lhrkafka /]# jps 161 QuorumPeerMain 162 Kafka 1127 Jps [root@lhrkafka /]# ps -ef|grep java root 161 1 7 14:20 ? 00:00:03 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/local/kafka/config/zookeeper.properties root 162 1 30 14:20 ? 00:00:14 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xloggc:/usr/local/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999 -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-json-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-runtime-3.2.0.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.2.0.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-core-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-databind-2.12.6.1.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-base-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotations-2.12.6.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.12.6.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/usr/local/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/local/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-continuation-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-security-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.4.44.v20210927.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka/bin/../libs/kafka_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-clients-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-raft-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.2.0.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.2.0.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/../libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-codec-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-resolver-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-tcnative-classes-2.0.46.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-epoll-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.73.Final.jar:/usr/local/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/usr/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/trogdor-3.2.0.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/local/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties root 1167 961 0 14:20 pts/1 00:00:00 grep --color=auto java [root@lhrkafka /]# netstat -tulnp | grep java tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:9999 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:37691 0.0.0.0:* LISTEN 161/java tcp 0 0 0.0.0.0:40831 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:38977 0.0.0.0:* LISTEN 162/java tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 162/java |
kafka默认占用9092端口,ZK默认占用2181端口。
kafka日志:
1 | tailf /usr/local/kafka/logs/server.log |
测试一下,在服务器上创建一个topic为test,然后生产几条信息:
1 2 3 4 5 6 7 8 9 10 | -- 生产者 /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test >hello >world -- 在另一台机器上,开启消费者控制台,监听test的topic,发现可以收到数据 /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning hello word |
源端OGG for oracle环境
OGG下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | -- OGG机器,同时包含oracle和bigdata docker rm -f lhrogg21all docker run -d --name lhrogg21all -h lhrogg21all \ --net=ora-network --ip 172.72.7.7 \ -p 39391:3389 -p 37809-37819:7809-7819 \ -v /sys/fs/cgroup:/sys/fs/cgroup \ --privileged=true lhrbest/ogg21all:v5.0 \ /usr/sbin/init docker exec -it lhrogg21all bash su - oracle ogg add credentialstore alter credentialstore add user ogg@172.72.7.34/lhrsdb, password lhr alias ora12c INFO CREDENTIALSTORE dblogin useridalias ora12c sqlplus ogg/lhr@172.72.7.34/lhrsdb dblogin useridalias ora12c ADD SCHEMATRANDATA LHR INFO SCHEMATRANDATA LHR list tables LHR.* GGSCI (lhrogg21all as ogg@lhrsdb) 6> INFO SCHEMATRANDATA LHR 2022-06-28 15:26:46 INFO OGG-06480 Schema level supplemental logging, excluding non-validated keys, is enabled on schema "LHR". 2022-06-28 15:26:46 INFO OGG-01980 Schema level supplemental logging is enabled on schema "LHR" for all scheduling columns. 2022-06-28 15:26:46 INFO OGG-10462 Schema "LHR" have 16 prepared tables for instantiation. GGSCI (lhrogg21all as ogg@lhrsdb) 7> list tables LHR.* "LHR"."ADDRESSES" "LHR"."CARD_DETAILS" "LHR"."CUSTOMERS" "LHR"."IMAGE_LOB" "LHR"."INVENTORIES" "LHR"."LOGON" "LHR"."ORDERENTRY_METADATA" "LHR"."ORDERS" "LHR"."ORDER_ITEMS" "LHR"."PRODUCTS" "LHR"."PRODUCT_DESCRIPTIONS" "LHR"."PRODUCT_INFORMATION" "LHR"."PRODUCT_PRICES" "LHR"."TCUSTMER" "LHR"."TCUSTORD" "LHR"."TSRSLOB" "LHR"."TTRGVAR" "LHR"."WAREHOUSES" Found 18 tables matching list criteria. -- 有2个是视图,后期需要排除掉 tableexclude LHR.PRODUCTS; tableexclude LHR.PRODUCT_PRICES; GGSCI (lhrogg21all as ogg@lhrsdb) 10> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING JAGENT STOPPED PMSRVR STOPPED |
目标端OGG for bigdata环境
OGG下载地址:https://www.oracle.com/middleware/technologies/goldengate-downloads.html
1 2 3 4 5 6 7 8 9 10 | docker exec -it lhrogg21all bash su - bigdata ogg -- 解决OGG-01201:Error reported by MGR : Access denied edit params mgr port 8809 ACCESSRULE, PROG *, IPADDR *, ALLOW |
全量同步
注意:在此阶段,源端需要停业务,不能产生新数据。
Oracle端配置
OGG初始化可以将数据直接输入目标端,也可以先抽取到本地,然后再输入目标端,这里我们直接同步到目标端的kafka里,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | -- oracle端 edit params ext0 EXTRACT ext0 USERIDALIAS ora12c rmthost 127.0.0.1,mgrport 8809 rmttask replicat,group rep0 tableexclude LHR.PRODUCTS; tableexclude LHR.PRODUCT_PRICES; TABLE LHR.*; add extract ext0 ,sourceistable -- 启动mgr start mgr |
- SOURCEISTABLE指示Extract直接从源表中读取完整的记录。
kafka端配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | edit params rep0 REPLICAT rep0 targetdb libfile libggjava.so set property=./dirprm/kafka.props REPLACEBADCHAR SKIP SOURCECHARSET OVERRIDE GBK map LHR.*, target LHR.*; add replicat rep0 ,specialrun start mgr -- 配置kafka参数 vi /oggbigdata/dirprm/kafka.props gg.handler.kafkahandler.schemaTopicName=LHR_OGG vi /oggbigdata/dirprm/custom_kafka_producer.properties bootstrap.servers=172.72.7.44:9092 |
SPECIALRUN –将replicat设定为一次性运行,不需要checkpoint
END RUNTIME –当load完成后终结replicat
gg.handler.kafkahandler.topicMappingTemplate:kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic。