TopDev

xây dựng job đồng bộ dữ liệu bằng java từ 2 bảng (Oracle DB) về 1 bảng log chung

minhu 📖 6 phút đọc 💬 1 phản hồi

Để xây dựng job đồng bộ dữ liệu từ 2 bảng (Oracle DB) về 1 bảng log chung, mỗi bảng có khoảng 50.000 bản ghi mới mỗi ngày, bạn cần thiết kế giải pháp theo hướng hiệu năng cao, ổn định và tránh trùng lặp. Dưới đây là giải pháp chi tiết:



🎯 Mục tiêu#

  • Đồng bộ dữ liệu mỗi ngày (hoặc mỗi X phút nếu cần realtime hơn).

  • Tích hợp dữ liệu từ 2 bảng nguồn vào 1 bảng log chung.

  • Tránh đồng bộ trùng dữ liệu.

  • Hiệu suất tốt với khối lượng dữ liệu lớn.



1. Thiết kế cơ sở dữ liệu#

Bảng nguồn:#

Giả sử bạn có:

  • SOURCE_A(id, data, created_at, ...)

  • SOURCE_B(id, data, created_at, ...)

Bảng log chung:#

LOG_TABLE ( id NUMBER PRIMARY KEY, source_table VARCHAR2(20), -- 'A' hoặc 'B' source_id NUMBER, -- ID của bản ghi từ bảng gốc data CLOB, -- Dữ liệu đồng bộ (tùy theo nhu cầu có thể lưu JSON/XML) created_at DATE, synced_at DATE DEFAULT SYSDATE )

Ràng buộc duy nhất:

ALTER TABLE LOG_TABLE ADD CONSTRAINT uq_source_id UNIQUE (source_table, source_id);



2. Cách đánh dấu bản ghi đã xử lý#

Có 2 cách:

Cách 1: Dùng cột created_at#

  • Giả sử các bảng SOURCE_ASOURCE_B đều có cột created_at.

  • Lưu lại thời điểm cuối cùng đã đồng bộ bằng một bảng cấu hình:

SYNC_STATE ( source_table VARCHAR2(20) PRIMARY KEY, last_synced DATE )

Cách 2: Gắn cờ “đã đồng bộ” trong bảng gốc#

  • Thêm cột synced_flag CHAR(1) và cập nhật khi đã đồng bộ (Y).

  • Tốt khi bạn có quyền sửa bảng gốc.



3. Giải pháp job đồng bộ (Java hoặc PL/SQL)#

📌 Java Job (Spring Boot hoặc Scheduler đơn giản)#

Các bước:

  • Đọc thời điểm last_synced từ SYNC_STATE.

  • Truy vấn các bản ghi mới từ SOURCE_ASOURCE_B.

SELECT * FROM SOURCE_A WHERE created_at > :last_synced;

  • Ghi dữ liệu vào LOG_TABLE, dùng batch insert hoặc streaming nếu cần.

  • Cập nhật SYNC_STATE với MAX(created_at) vừa đồng bộ.

  • Xử lý ngoại lệ/trùng lặp bằng ràng buộc UNIQUE.

Pseudo-code:

` for (String source : ["A", "B"]) { Timestamp lastSynced = getLastSynced(source);

List<Data> newRecords = query("SELECT * FROM SOURCE_" + source + 
    " WHERE created_at > ?", lastSynced);

for (Data record : newRecords) {
    insert into LOG_TABLE (
        source_table, source_id, data, created_at
    );
}

update SYNC_STATE set last_synced = max(created_at) of batch;

} `



4. Hiệu năng & mở rộng#

  • Dùng batch size = 1000~5000 mỗi lần insert.

  • Cân nhắc dùng Oracle Bulk Insert (JDBC batch) hoặc PL/SQL với FORALL.

  • Nếu dữ liệu lớn, bạn có thể nén hoặc lưu dạng JSON ở cột data.



5. Tùy chọn mở rộng#

  • Ghi log vào Kafka để có thể xử lý async.

  • Gửi alert nếu số bản ghi giảm bất thường.

  • Tạo dashboard theo dõi số lượng log được đồng bộ mỗi ngày.



✅ Kết luận#

Thành phần Gợi ý
Loại job Java Scheduler / PL/SQL Job
Điều kiện lọc Dựa trên created_at hoặc synced_flag
Xử lý trùng Ràng buộc UNIQUE (source_table, source_id)
Hiệu năng Batch insert + chỉ xử lý dữ liệu mới
Lưu trạng thái Bảng SYNC_STATE để track tiến độ

1 phản hồi

  1. minhu #1

    Dưới đây là mẫu code Java thuần giúp bạn đồng bộ dữ liệu từ 2 bảng Oracle về 1 bảng log theo lịch (ví dụ mỗi ngày 1 lần), sử dụng JDBC + ScheduledExecutorService.



    📦 Cấu trúc project gọn gàng:#

    SyncJob/ ├── SyncJob.java // Class chạy chính ├── DBUtil.java // Kết nối Oracle ├── SyncService.java // Logic đồng bộ └── sync_config.properties // Config DB và truy vấn



    1. 🔧 sync_config.properties#

    db.url=jdbc:oracle:thin:@localhost:1521:xe db.username=your_user db.password=your_password sync.interval.seconds=86400



    2. 🔌 DBUtil.java#

    ` import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; import java.io.InputStream;

    public class DBUtil { private static String dbUrl; private static String dbUser; private static String dbPass;

    static {
        try (InputStream input = DBUtil.class.getClassLoader().getResourceAsStream("sync_config.properties")) {
            Properties prop = new Properties();
            prop.load(input);
            dbUrl = prop.getProperty("db.url");
            dbUser = prop.getProperty("db.username");
            dbPass = prop.getProperty("db.password");
            Class.forName("oracle.jdbc.OracleDriver");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static Connection getConnection() throws SQLException {
        return DriverManager.getConnection(dbUrl, dbUser, dbPass);
    }
    

    } `



    3. 🔁 SyncService.java#

    ` import java.sql.*; import java.util.Date;

    public class SyncService {

    public void sync(String sourceTable) {
        System.out.println("Syncing from table: " + sourceTable + " at " + new Date());
    
        String query = "SELECT id, data, created_at FROM " + sourceTable + " WHERE created_at > (SELECT NVL(MAX(created_at), TO_DATE('2000-01-01','YYYY-MM-DD')) FROM log_table WHERE source_table = ?)";
    
        try (Connection conn = DBUtil.getConnection();
             PreparedStatement stmt = conn.prepareStatement(query)) {
    
            stmt.setString(1, sourceTable);
            ResultSet rs = stmt.executeQuery();
    
            int count = 0;
    
            while (rs.next()) {
                int id = rs.getInt("id");
                String data = rs.getString("data");
                Timestamp createdAt = rs.getTimestamp("created_at");
    
                insertLog(conn, sourceTable, id, data, createdAt);
                count++;
            }
    
            System.out.println("Synced " + count + " records from " + sourceTable);
    
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void insertLog(Connection conn, String sourceTable, int sourceId, String data, Timestamp createdAt) {
        String insertSQL = "INSERT /*+ IGNORE_ROW_ON_DUPKEY_INDEX(log_table, (source_table, source_id)) */ " +
                           "INTO log_table (id, source_table, source_id, data, created_at, synced_at) " +
                           "VALUES (log_seq.NEXTVAL, ?, ?, ?, ?, SYSTIMESTAMP)";
        try (PreparedStatement stmt = conn.prepareStatement(insertSQL)) {
            stmt.setString(1, sourceTable);
            stmt.setInt(2, sourceId);
            stmt.setString(3, data);
            stmt.setTimestamp(4, createdAt);
            stmt.executeUpdate();
        } catch (SQLIntegrityConstraintViolationException e) {
            // Trùng → bỏ qua
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    

    } `

    ✅ Gợi ý: bạn có thể dùng sequence log_seq trong Oracle cho cột id của bảng log.



    4. 🕒 SyncJob.java#

    ` import java.util.concurrent.*; import java.io.InputStream; import java.util.Properties;

    public class SyncJob { public static void main(String[] args) { try { Properties props = new Properties(); InputStream input = SyncJob.class.getClassLoader().getResourceAsStream("sync_config.properties"); props.load(input);

            long interval = Long.parseLong(props.getProperty("sync.interval.seconds", "86400")); // Mặc định 24h
    
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            SyncService syncService = new SyncService();
    
            Runnable task = () -> {
                syncService.sync("SOURCE_A");
                syncService.sync("SOURCE_B");
            };
    
            scheduler.scheduleAtFixedRate(task, 0, interval, TimeUnit.SECONDS);
    
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    

    } `



    ✅ Cách chạy#

    • Build project thành file .jar.

    • Chạy:

    java -jar SyncJob.jar

    • Có thể dùng cron hoặc systemd để chạy job theo lịch hàng ngày.


    ✅ Ưu điểm#

    • Không phụ thuộc framework.

    • Dễ triển khai, bảo trì.

    • Có thể nâng cấp lên Spring Boot sau nếu cần mở rộng.

Bài liên quan trong #Java

✓ Đã sao chép link