Debezium 与 Apache Kafka 的集成方式

news/2025/2/21 5:17:31

一、集成概述

Debezium 与 Apache Kafka 的集成主要通过 Kafka Connect 实现。

Kafka Connect 是一个用于数据集成的分布式平台,而 Debezium 作为 Kafka Connect 的 Source Connector,负责将数据库的变更数据捕获并发送到 Kafka。

二、集成步骤

1. 准备 Kafka 环境

安装 Kafka:确保你已经安装并启动了 Kafka 和 Zookeeper。如果使用 Docker,可以参考以下命令启动 Kafka 和 Zookeeper:

docker run -d --name zookeeper -p 2181:2181 -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest

2. 配置 Kafka Connect

下载并安装 Kafka Connect:确保 Kafka Connect 已安装并配置好。

配置 Kafka Connect:编辑 connect-distributed.properties 文件,设置 Kafka 集群地址和插件路径:

bootstrap.servers=localhost:9092
plugin.path=/path/to/your/plugins

3. 安装 Debezium Connector

下载 Debezium Connector 插件:根据你的数据库类型(如 MySQL、PostgreSQL 等),下载对应的 Debezium Connector 插件。

解压并放置插件:将下载的插件解压到 Kafka Connect 的插件目录。

4. 启动 Kafka Connect

启动 Kafka Connect:使用以下命令启动 Kafka Connect:

bin/connect-distributed.sh config/connect-distributed.properties

5. 注册 Debezium Connector

创建 Connector 配置文件:根据你的数据库类型和需求,创建一个 JSON 格式的配置文件。例如,对于 MySQL 数据库:

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "mydatabase",
    "table.include.list": "mydatabase.mytable",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.mydatabase"
  }
}

注册 Connector:通过 Kafka Connect 的 REST API 注册 Connector:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @mysql-connector.json

6. 验证集成

查看 Connector 状态:通过以下命令查看 Connector 的状态:

curl http://localhost:8083/connectors/mysql-connector/status

检查 Kafka Topic:在 Kafka 中查看生成的 Topic,确保数据正在流入。

三、注意事项

  • 数据库配置:确保数据库已配置好相应的参数,如 MySQL 的 binlog 或 PostgreSQL 的 wal_level。
  • 插件路径:确保 Kafka Connect 的 plugin.path 配置正确,指向 Debezium 插件所在目录。
  • 网络问题:如果使用 Docker,确保 Kafka Connect 和数据库之间可以正常通信。

通过以上步骤,你可以将 Debezium 与 Apache Kafka 集成,实现数据库变更数据的实时捕获和同步。


http://www.niftyadmin.cn/n/5860188.html

相关文章

【安装Tvikit包的时候提示 OpenCV 的旧宏 CV_WINDOW_AUTOSIZE不适配opencv3+】

安装Tvikit包的时候提示 OpenCV 的旧宏 CV_WINDOW_AUTOSIZE不适配opencv3 1. 查找所有 OpenCV 旧宏 使用 grep 工具全局搜索 CV_ 开头的宏,确保没有遗漏: bash grep -r “CV_” ~/lidar_odometry/src/fast_livo_ws/src/rpg_vikit/vikit_common 检查输…

从 JS 到 Dart:语法基础

声明:var final const。支持自动推断类型,但类型一直固定。未初始化的值为 null Final vs const: const 编译时确定,final 运行时确定 基本类型:num int double String bool List Set Map int.parse(‘1’); // 1 1.toString()带…

我用Ai学Android Jetpack Compose之Composable与View的区别与联系

本篇,我们来学习Composable与View的联系和区别,答案来自 通义千问, Q: Composable与View的联系和区别 在 Android 开发中,Composable 和 View 是两种不同的 UI 构建方式。它们分别属于 Jetpack Compose 和传统基于 XML 的视图系…

MySQL如何解决幻读?

目录 一、什么是幻读? 1.1 幻读的定义 1.2 幻读的示例 1.3 幻读产生的原因? 1.4?读已提交(Read Committed) 1.4.1 确定事务等级 1.4.2 非锁定读取 准备 示例 结论 1.4.3 锁定读取 准备 示例 分析 结论 1.5?可重…

MyBatis-Plus之通用枚举

MyBatis-Plus之通用枚举 前言 MyBatis-Plus中提供了通用枚举,简单来说就是将数据库中的某一字段的代替的含义转换成真实的含义将数据展示给用户,用户在存储时也会将真实值转换成代替的数字存入到数据库中。举个例子:用户性别在数据库中存储…

JavaScript 前端面试 2(DOM、BOM)

三、DOM常见的操作有哪些 创建、删除、查询、更新节点(用类似数据库的操作来讲就是对节点的增删改查),节点可以是元素(element)、文本(text)、注释(comment)等等。 1&a…

Vue.js 入门指南:从基础到实战

Vue.js 是一款流行的渐进式 JavaScript 框架,广泛用于构建交互式 Web 界面。它具有简单易学、轻量级、高性能的特点,适合前端新手入门。本文将从 Vue 的基本概念入手,详细介绍 Vue 的生命周期及常见用法,帮助你快速上手 Vue 开发 官网:https://cn.vuejs.org/ 1. Vue.js 介…

智能体(AI Agent、Deepseek、硅基流动)落地实践Demo——借助大模型生成报表,推动AI赋能企业决策

文章目录 一、 引言二、 系统设计与技术细节2.1 系统架构2.2 核心组件说明 三、 Demo 代码推荐博客: 四、输出年度营销报告1. 总销售额 根据提供的数据,年度总销售额为:740.0。2. 各产品销售额3. 各地区销售额4. 各产品在各地区的销售情况 分…