TopDev

Tối ưu hiệu suất insert dữ liệu lớn trong hệ thống Mediation Interconnect

minhdev 📖 12 phút đọc ✎ đã sửa

Trong các hệ thống tính cước viễn thông, đặc biệt là hệ thống Mediation cho dự án cước kết nối Interconnect, việc xử lý và insert dữ liệu lớn (như file CDR - Call Detail Record) vào cơ sở dữ liệu là một trong những nhiệm vụ then chốt ảnh hưởng trực tiếp đến hiệu năng toàn hệ thống. Bài viết này sẽ cung cấp các giải pháp tối ưu insert dữ liệu lớn, giúp nâng cao hiệu suất và đảm bảo độ tin cậy trong quy trình xử lý dữ liệu.



1. Tổng quan về hệ thống Mediation trong dự án Interconnect#

Hệ thống Mediation có vai trò thu thập, chuẩn hóa, lọc và chuyển đổi dữ liệu cước từ các hệ thống mạng về định dạng thống nhất, trước khi chuyển tới hệ thống tính cước hoặc lưu trữ.

Trong các dự án Interconnect giữa các nhà mạng, dữ liệu thường rất lớn, đến từ nhiều nguồn (CDR thoại, SMS, Data...), và có yêu cầu cao về độ chính xác, tính thời gian thực và hiệu suất xử lý.



2. Thách thức khi insert dữ liệu lớn vào hệ thống Mediation#

Việc insert hàng triệu bản ghi mỗi ngày, thậm chí mỗi giờ, có thể dẫn đến:

  • Nghẽn cổ chai ở tầng cơ sở dữ liệu

  • Quá tải tài nguyên CPU/RAM khi xử lý file CDR lớn

  • Gây lỗi do lock DB, duplicate, hoặc hết connection pool

  • Ảnh hưởng đến các hệ thống downstream như tính cước, phân tích dữ liệu



3. Giải pháp tăng hiệu suất insert dữ liệu lớn#

Dưới đây là các giải pháp thực tế đã được áp dụng tại nhiều hệ thống Mediation lớn trong ngành viễn thông:

3.1 Tối ưu xử lý tại tầng ứng dụng#

a. Batch Insert (Chèn theo lô)

  • Gom nhiều bản ghi (1000–5000 dòng) để insert một lần.

  • Giảm số lần giao tiếp với DB, tăng throughput.

  • Java JDBC: sử dụng addBatch()executeBatch().

b. Multi-threading (Xử lý đa luồng)

  • Chia nhỏ file CDR hoặc data stream để xử lý song song.

  • Sử dụng ExecutorService, ForkJoinPool hoặc CompletableFuture.

c. Streaming và xử lý theo dòng

  • Đọc dữ liệu dạng stream (InputStream) thay vì load toàn bộ file vào bộ nhớ.

  • Dùng Queue nội bộ hoặc Kafka để tách luồng xử lý.



3.2 Tối ưu cơ sở dữ liệu (DB)#

a. Tạm thời tắt Index, Trigger, Log

  • Khi chèn dữ liệu số lượng lớn, có thể tạm tắt index và trigger, sau đó kích hoạt lại sau khi insert hoàn tất.

b. Dùng công cụ Bulk Load chuyên biệt

  • Oracle: SQL*Loader, Direct Path Load

  • PostgreSQL: COPY command

  • MySQL: LOAD DATA INFILE

  • Tốc độ nhanh hơn insert bằng câu lệnh SQL thông thường.

c. Partition Table (phân vùng bảng)

  • Chia bảng theo ngày, theo nhà mạng hoặc theo loại cước.

  • Giảm contention, tối ưu hiệu suất truy vấn và chèn.

d. Tối ưu cấu hình DB

  • Tăng giới hạn batch size, pool size, commit interval

  • Tối ưu transaction: commit theo lô thay vì từng dòng



3.3 Ứng dụng kiến trúc xử lý phân tán#

a. Dùng Kafka để phân luồng dữ liệu

  • Kafka giúp chia nhỏ luồng dữ liệu từ nhiều nguồn (producer) và tiêu thụ song song bởi nhiều consumer.

  • Đảm bảo khả năng mở rộng và chịu lỗi tốt.

b. Áp dụng Apache Flink hoặc Spark Streaming

  • Xử lý dữ liệu CDR gần thời gian thực.

  • Có thể kết hợp với Kafka để tạo pipeline xử lý mạnh mẽ, linh hoạt.

c. Buffer trung gian bằng Redis

  • Redis hỗ trợ lưu đệm dữ liệu tạm thời giúp tránh mất dữ liệu khi DB quá tải.


4. Mô hình xử lý đề xuất cho dự án Interconnect#

CDR Files → Parser đa luồng → Queue nội bộ / Kafka → Batch Inserter → DB Partition

Ưu điểm:#

  • Tách biệt rõ các thành phần xử lý

  • Dễ mở rộng hoặc điều chỉnh từng bước

  • Có thể theo dõi hiệu suất từng phần



5. Một số công cụ hỗ trợ tối ưu insert#

Công cụ Mô tả
Apache Kafka Phân phối dữ liệu theo topic
SQL*Loader Nạp dữ liệu nhanh vào Oracle
Apache Flink Streaming CDR real-time
Redis Cache dữ liệu tạm thời
Spring Batch Batch insert theo job định kỳ
Pentaho, Talend ETL xử lý trực quan


6. Kinh nghiệm thực tế từ các hệ thống lớn#

  • Viettel: sử dụng Kafka + Flink cho xử lý realtime CDR

  • Mobifone: tối ưu bằng batch insert kết hợp phân vùng bảng Oracle

  • VNPT: Java Mediation Parser đa luồng + JDBC batch + monitor



Kết luận#

Việc tăng hiệu suất insert dữ liệu lớn trên hệ thống Mediation dự án cước kết nối Interconnect không chỉ là bài toán về kỹ thuật lập trình mà còn đòi hỏi cách tiếp cận kiến trúc tổng thể, từ tầng ứng dụng đến tầng cơ sở dữ liệu. Các giải pháp như batch insert, multi-threading, Kafka, phân vùng bảng và bulk load đã chứng minh hiệu quả rõ rệt trong thực tế.

Do đó, tùy theo quy mô hệ thống và yêu cầu cụ thể, bạn có thể áp dụng linh hoạt các phương pháp trên để tối ưu hiệu năng và đảm bảo tính ổn định của hệ thống Mediation.



Từ khóa SEO phụ đề xuất chèn vào nội dung (tự nhiên):

  • tối ưu insert CDR

  • Mediation xử lý dữ liệu lớn

  • tăng tốc ghi dữ liệu viễn thông

  • hệ thống tính cước Interconnect

  • batch insert Java JDBC

  • SQL Loader cho Oracle

  • xử lý file CDR hiệu quả



Dưới đây là một hướng dẫn kỹ thuật chi tiết bằng mã Java để thực hiện Batch Insert dữ liệu lớn vào hệ thống cơ sở dữ liệu trong bối cảnh hệ thống Mediation của dự án cước kết nối Interconnect.



💡 Mục tiêu:

  • Đọc dữ liệu CDR (giả lập từ file hoặc input stream)

  • Xử lý và chèn vào DB theo dạng batch

  • Sử dụng JDBC chuẩn (có thể tích hợp vào ứng dụng Mediation hiện tại)



🧱 Yêu cầu hệ thống:#

  • Java 8 trở lên

  • JDBC Driver phù hợp (VD: Oracle, PostgreSQL, MySQL)

  • File dữ liệu CDR dạng CSV hoặc dạng dòng text



📦 Cấu trúc bảng dữ liệu ví dụ (cdr_table)#

CREATE TABLE cdr_table ( id SERIAL PRIMARY KEY, msisdn VARCHAR(20), call_time TIMESTAMP, duration INT, charge_amount NUMERIC(10,2) );



🧪 Demo Java: Batch Insert file CDR vào cơ sở dữ liệu#

` import java.io.BufferedReader; import java.io.FileReader; import java.sql.*; import java.util.ArrayList; import java.util.List;

public class BatchInsertCDR { private static final String DB_URL = "jdbc:postgresql://localhost:5432/interconnectdb"; private static final String DB_USER = "dbuser"; private static final String DB_PASSWORD = "dbpass";

private static final int BATCH_SIZE = 1000;

public static void main(String[] args) {
    String cdrFilePath = "cdr_data.csv"; // File chứa dữ liệu CDR

    try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
        conn.setAutoCommit(false); // Tăng hiệu suất bằng cách dùng transaction

        String sql = "INSERT INTO cdr_table (msisdn, call_time, duration, charge_amount) VALUES (?, ?, ?, ?)";
        try (PreparedStatement ps = conn.prepareStatement(sql)) {

            try (BufferedReader reader = new BufferedReader(new FileReader(cdrFilePath))) {
                String line;
                int count = 0;

                while ((line = reader.readLine()) != null) {
                    String[] parts = line.split(",");

                    if (parts.length < 4) continue; // bỏ qua dòng lỗi

                    String msisdn = parts[0];
                    Timestamp callTime = Timestamp.valueOf(parts[1]); // yyyy-MM-dd HH:mm:ss
                    int duration = Integer.parseInt(parts[2]);
                    double amount = Double.parseDouble(parts[3]);

                    ps.setString(1, msisdn);
                    ps.setTimestamp(2, callTime);
                    ps.setInt(3, duration);
                    ps.setDouble(4, amount);

                    ps.addBatch();
                    count++;

                    if (count % BATCH_SIZE == 0) {
                        ps.executeBatch();
                        conn.commit();
                        System.out.println("Inserted " + count + " rows...");
                    }
                }

                ps.executeBatch(); // insert phần còn lại
                conn.commit();
                System.out.println("Finished inserting all data.");
            }

        } catch (Exception e) {
            conn.rollback();
            System.err.println("Lỗi khi insert: " + e.getMessage());
            e.printStackTrace();
        }

    } catch (Exception e) {
        System.err.println("Không thể kết nối DB: " + e.getMessage());
    }
}

} `



📄 Ví dụ dữ liệu cdr_data.csv#

84901234567,2025-07-28 08:30:00,120,1500.50 84909876543,2025-07-28 08:45:15,60,750.25 ...



✅ Ưu điểm kỹ thuật:#

  • Batch size giúp giảm số lần gọi DB → tăng hiệu suất

  • Transaction control (conn.setAutoCommit(false)): đảm bảo rollback nếu lỗi

  • Xử lý dòng lớn mà không tốn RAM: dùng BufferedReader



🔧 Gợi ý nâng cao#

  • Đọc file song song bằng multi-thread

  • Đọc từ Kafka topic thay vì file CSV nếu dùng kiến trúc real-time

  • Ghi log các dòng lỗi ra file riêng (dirty record)

  • Tích hợp Prometheus/Grafana để giám sát insert throughput



Dưới đây là một phiên bản nâng cấp của Batch Insert Java, trong đó dữ liệu CDR được đọc từ Kafkainsert theo lô (batch) vào cơ sở dữ liệu, phù hợp cho hệ thống Mediation của dự án cước kết nối Interconnect.



📌 Mục tiêu:#

  • Consumer Kafka topic chứa bản ghi CDR

  • Gom dữ liệu thành từng batch và insert vào DB (PostgreSQL, Oracle…)

  • Tăng hiệu suất và độ ổn định



🧱 Công nghệ sử dụng:#

  • Java

  • Kafka Client (org.apache.kafka:kafka-clients)

  • JDBC



📋 Mô hình kiến trúc:#

[CDR Producer] → Kafka Topic ("cdr-topic") → [CDRConsumer.java] → Batch Insert → DB



📦 Cấu hình Maven#

<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.0</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.7.1</version> </dependency> </dependencies>



🧪 Code Java: Consumer Kafka và Batch Insert#

` import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.serialization.StringDeserializer;

import java.sql.; import java.time.Duration; import java.util.;

public class CDRKafkaConsumer {

private static final String KAFKA_SERVERS = "localhost:9092";
private static final String KAFKA_TOPIC = "cdr-topic";
private static final String GROUP_ID = "cdr-inserter-group";

private static final String DB_URL = "jdbc:postgresql://localhost:5432/interconnectdb";
private static final String DB_USER = "dbuser";
private static final String DB_PASSWORD = "dbpass";

private static final int BATCH_SIZE = 1000;

public static void main(String[] args) {

    // Kafka consumer config
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {

        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
        conn.setAutoCommit(false);

        String sql = "INSERT INTO cdr_table (msisdn, call_time, duration, charge_amount) VALUES (?, ?, ?, ?)";
        try (PreparedStatement ps = conn.prepareStatement(sql)) {

            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                }

                if (buffer.size() >= BATCH_SIZE) {
                    for (ConsumerRecord<String, String> r : buffer) {
                        String[] parts = r.value().split(",");
                        if (parts.length < 4) continue;

                        ps.setString(1, parts[0]);
                        ps.setTimestamp(2, Timestamp.valueOf(parts[1]));
                        ps.setInt(3, Integer.parseInt(parts[2]));
                        ps.setDouble(4, Double.parseDouble(parts[3]));
                        ps.addBatch();
                    }

                    ps.executeBatch();
                    conn.commit();
                    consumer.commitSync();
                    System.out.println("Inserted batch of " + buffer.size());
                    buffer.clear();
                }
            }

        } catch (Exception e) {
            conn.rollback();
            System.err.println("Error in batch insert: " + e.getMessage());
            e.printStackTrace();
        }

    } catch (Exception e) {
        System.err.println("Kafka/DB connection failed: " + e.getMessage());
    }
}

} `



📄 Dữ liệu Kafka CDR ví dụ (format value):#

84901234567,2025-07-28 08:30:00,120,1500.50 84909876543,2025-07-28 08:45:15,60,750.25



✅ Ưu điểm kỹ thuật:#

  • Kafka giúp phân tán dữ liệu từ nhiều producer (site, node...)

  • Batch insert giúp giảm số lần gọi DB

  • Tự động commit Kafka offset sau mỗi batch thành công → tránh mất dữ liệu



🧠 Gợi ý mở rộng:#

  • Dùng Kafka Consumer Group để chia luồng xử lý nhiều node

  • Ghi log các bản ghi lỗi vào file riêng hoặc Kafka topic cdr-dead-letter

  • Dùng Prometheus để theo dõi độ trễ insert và số lượng bản ghi đã xử lý

  • Cân nhắc thêm thread pool để insert song song nhiều batch



Dưới đây là ví dụ hoàn chỉnh về một Kafka Producer bằng Java dùng để giả lập gửi dữ liệu CDR vào topic Kafka "cdr-topic".



🎯 Mục tiêu:#

  • Gửi dữ liệu CDR giả lập vào Kafka topic theo định dạng CSV (1 dòng = 1 record)

  • Mỗi record gồm: msisdn, call_time, duration, charge_amount

  • Tùy chọn gửi ngẫu nhiên hoặc từ file thật



🧱 Yêu cầu:#

  • Cài Kafka server (localhost:9092)

  • Kafka topic: cdr-topic (nếu chưa có, tạo bằng lệnh CLI)

  • Java 8+

  • Thêm dependency Kafka Client



📦 Maven dependency#

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.7.0</version> </dependency>



🧪 Code Java: Kafka CDR Producer#

` import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer;

import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Properties; import java.util.Random;

public class CDRKafkaProducer {

private static final String KAFKA_SERVERS = "localhost:9092";
private static final String KAFKA_TOPIC = "cdr-topic";

private static final Random random = new Random();
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) {

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVERS);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

        for (int i = 1; i <= 10000; i++) {
            String msisdn = "8490" + (1000000 + random.nextInt(9000000));
            String callTime = dtf.format(LocalDateTime.now().minusMinutes(random.nextInt(120)));
            int duration = 30 + random.nextInt(300); // Giây
            double charge = Math.round((duration * 12.5 + random.nextDouble() * 10) * 100.0) / 100.0;

            String value = msisdn + "," + callTime + "," + duration + "," + charge;

            ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TOPIC, null, value);
            producer.send(record);

            if (i % 100 == 0) {
                System.out.println("Sent " + i + " CDR records.");
                Thread.sleep(10); // Sleep nhẹ để tránh spam Kafka quá nhanh
            }
        }

        System.out.println("CDR Kafka Producer finished sending 10,000 records.");

    } catch (Exception e) {
        e.printStackTrace();
    }
}

} `



📄 Ví dụ dữ liệu gửi lên Kafka#

84901234567,2025-07-28 08:30:00,120,1500.50 84909876543,2025-07-28 08:45:15,60,750.25 ...



✅ Gợi ý nâng cao:#

  • Cho phép đọc từ file CSV thực: dùng BufferedReaderproducer.send() từng dòng

  • Thêm metadata vào record như record.key hoặc partition

  • Gửi theo batch bằng producer.sendAsync().get()



📌 Lệnh tạo topic Kafka nếu chưa có:#

kafka-topics.sh --create --topic cdr-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1



Bạn có muốn mình viết thêm:

  • Phiên bản Producer đọc file CSV thật?

  • Hay tích hợp vào Spring Boot Kafka Producer luôn?

Bài liên quan trong #Tin tức

✓ Đã sao chép link