?需要將生產(chǎn)環(huán)境上Infoxmix里的數(shù)據(jù)原封不動的Copy到另一臺 Oracle數(shù)據(jù)庫服務(wù)器上,然后對Copy后的數(shù)據(jù)作些漂白處理。為了將人為干預(yù)的因素降到最低,在系統(tǒng)設(shè)計時采用Java代碼對數(shù)據(jù)作Copy,思路 如圖:?
??? 首 先在代碼與生產(chǎn)庫間建立一個Connection,將讀取到的數(shù)據(jù)放在ResultSet對象,然后再與開發(fā)庫建立一個Connection。從 ResultSet取出數(shù)據(jù)后通過TestConnection插入到開發(fā)庫,以此來實現(xiàn)Copy。代碼寫完后運行程序,速度太慢了,一秒鐘只能Copy 一千條數(shù)據(jù),生產(chǎn)庫上有上億條數(shù)據(jù),按照這個速度同步完要到猴年馬月呀,用PreparedStatement批處理速度也沒有提交多少。我想能不能用多 線程處理,多個人干活總比一個人干活速度要快。
??? 假設(shè)生產(chǎn)庫有1萬條數(shù)據(jù),我開5個線程,每個線程分2000條數(shù)據(jù),同時向開發(fā)庫里插數(shù)據(jù),Oracle支持高并發(fā)這樣的話速度至少會提高好多倍,按照這 個思路重新進行了編碼,批處理設(shè)置為1萬條一提交,統(tǒng)計插入數(shù)量的變量使用 java.util.concurrent.atomic.AtomicLong,程序一運行,傳輸速度飛快CPU利用率在70%~90%,現(xiàn)在一秒鐘可 以拷貝50萬條記錄,沒過幾分鐘上億條數(shù)據(jù)一條不落地全部Copy到目標(biāo)庫。
在查詢的時候我用了如下語句
?
String queryStr = "SELECT * FROM xx"; ResultSet coreRs = PreparedStatement.executeQuery(queryStr);
?
?
實習(xí)生問如果xx表里有上千萬條記錄,你全部查詢出來放到ResultSet, 那內(nèi)存不溢出了么?Java在設(shè)計的時候已經(jīng)考慮到這個問題了,并沒有查詢出所有的數(shù)據(jù),而是只查詢了一部分數(shù)據(jù)放到ResultSet,數(shù)據(jù)“用完”它 會自動查詢下一批數(shù)據(jù),你可以用setFetchSize(int rows)方法設(shè)置一個建議值給ResultSet,告訴它每次從數(shù)據(jù)庫Fetch多少條數(shù)據(jù)。但我不贊成,因為JDBC驅(qū)動會根據(jù)實際情況自動調(diào)整 Fetch的數(shù)量。另外性能也與網(wǎng)線的帶寬有直接的關(guān)系。
相關(guān)代碼
?
?
package com.dlbank.domain; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; /** *<p>title: 數(shù)據(jù)同步類 </p> *<p>Description: 該類用于將生產(chǎn)核心庫數(shù)據(jù)同步到開發(fā)庫</p> *@author Tank Zhang */ public class CoreDataSyncImpl implements CoreDataSync { private List<String> coreTBNames; //要同步的核心庫表名 private ConnectionFactory connectionFactory; private Logger log = Logger.getLogger(getClass()); private AtomicLong currentSynCount = new AtomicLong(0L); //當(dāng)前已同步的條數(shù) private int syncThreadNum; //同步的線程數(shù) @Override public void syncData(int businessType) throws Exception { for (String tmpTBName : coreTBNames) { log.info("開始同步核心庫" + tmpTBName + "表數(shù)據(jù)"); // 獲得核心庫連接 Connection coreConnection = connectionFactory.getDMSConnection(4); Statement coreStmt = coreConnection.createStatement(); //為每個線程分配結(jié)果集 ResultSet coreRs = coreStmt.executeQuery("SELECT count(*) FROM "+tmpTBName); coreRs.next(); //總共處理的數(shù)量 long totalNum = coreRs.getLong(1); //每個線程處理的數(shù)量 long ownerRecordNum =(long) Math.ceil((totalNum / syncThreadNum)); log.info("共需要同步的數(shù)據(jù)量:"+totalNum); log.info("同步線程數(shù)量:"+syncThreadNum); log.info("每個線程可處理的數(shù)量:"+ownerRecordNum); // 開啟五個線程向目標(biāo)庫同步數(shù)據(jù) for(int i=0; i < syncThreadNum; i ++){ StringBuilder sqlBuilder = new StringBuilder(); //拼裝后SQL示例 //Select * From dms_core_ds Where id between 1 And 657398 //Select * From dms_core_ds Where id between 657399 And 1314796 //Select * From dms_core_ds Where id between 1314797 And 1972194 //Select * From dms_core_ds Where id between 1972195 And 2629592 //Select * From dms_core_ds Where id between 2629593 And 3286990 //.. sqlBuilder.append("Select * From ").append(tmpTBName) .append(" Where id between " ).append(i * ownerRecordNum +1) .append( " And ") .append((i * ownerRecordNum + ownerRecordNum)); Thread workThread = new Thread( new WorkerHandler(sqlBuilder.toString(),businessType,tmpTBName)); workThread.setName("SyncThread-"+i); workThread.start(); } while (currentSynCount.get() < totalNum); //休眠一會兒讓數(shù)據(jù)庫有機會commit剩余的批處理(只針對JUnit單元測試, //因為單元測試完成后會關(guān)閉虛擬器,使線程里的代碼沒有機會作提交操作); //Thread.sleep(1000 * 3); log.info( "核心庫"+tmpTBName+"表數(shù)據(jù)同步完成,共同步了" + currentSynCount.get() + "條數(shù)據(jù)"); } }// end for loop public void setCoreTBNames(List<String> coreTBNames) { this.coreTBNames = coreTBNames; } public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void setSyncThreadNum(int syncThreadNum) { this.syncThreadNum = syncThreadNum; } //數(shù)據(jù)同步線程 final class WorkerHandler implements Runnable { ResultSet coreRs; String queryStr; int businessType; String targetTBName; public WorkerHandler(String queryStr,int businessType,String targetTBName) { this.queryStr = queryStr; this.businessType = businessType; this.targetTBName = targetTBName; } @Override public void run() { try { //開始同步 launchSyncData(); } catch(Exception e){ log.error(e); e.printStackTrace(); } } //同步數(shù)據(jù)方法 void launchSyncData() throws Exception{ // 獲得核心庫連接 Connection coreConnection = connectionFactory.getDMSConnection(4); Statement coreStmt = coreConnection.createStatement(); // 獲得目標(biāo)庫連接 Connection targetConn = connectionFactory.getDMSConnection(businessType); targetConn.setAutoCommit(false);// 設(shè)置手動提交 PreparedStatement targetPstmt = targetConn.prepareStatement("INSERT INTO " + targetTBName+" VALUES (?,?,?,?,?)"); ResultSet coreRs = coreStmt.executeQuery(queryStr); log.info(Thread.currentThread().getName()+"'s Query SQL::"+queryStr); int batchCounter = 0; //累加的批處理數(shù)量 while (coreRs.next()) { targetPstmt.setString(1, coreRs.getString(2)); targetPstmt.setString(2, coreRs.getString(3)); targetPstmt.setString(3, coreRs.getString(4)); targetPstmt.setString(4, coreRs.getString(5)); targetPstmt.setString(5, coreRs.getString(6)); targetPstmt.addBatch(); batchCounter++; currentSynCount.incrementAndGet();//遞增 if (batchCounter % 10000 == 0) { //1萬條數(shù)據(jù)一提交 targetPstmt.executeBatch(); targetPstmt.clearBatch(); targetConn.commit(); } } //提交剩余的批處理 targetPstmt.executeBatch(); targetPstmt.clearBatch(); targetConn.commit(); //釋放連接 connectionFactory.release(targetConn, targetPstmt,coreRs); } } }
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
