如何用源码分析canal的deployer模块

技术如何用源码分析canal的deployer模块这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Cana

这期内容当中小编将会给大家带来有关如何用源码分析运河的部署者模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

CanalLauncher是启动入口类

获取运河物业配置文件

如果运河物业配置文件中属性root.admin.manager有值,那么构造平面配置客户端,调用平面配置客户端的findServer获取平原运河,调用平原运河的获取属性方法获取性能

通过性能构造CanalStarter并调用其开始方法

CanalStarter是启动类

publicsynchronizedvoidstart()

//首先根据canal.serverMode构造CanalMQProducer,如果是卡夫卡,构造的是CanalKafkaProducer

StringserverMode=can控制器。getproperty(properties,CanalConstants .CANAL _ SERVER _ MODE);

if(servermode。equalsignorase('卡夫卡'){ 0

canalMQProducer=new canalkafkaproducer();

} else if(服务器模式。equalSignorCase(' rockeq '){ }

canalMQProducer=new canalrockemqproducer();

}

if(canamqproducer!=null){ 0

//禁用

系统。设置属性(CanaConstants .CANAL _ NORTH _ NETTY,' true ');

//设置为生的避免字节排序-条目的二次解析

系统。设置属性(' canal。实例。记忆。rawentry ',' false ');

}

//接下来构造CanalController并调用其开始方法

伐木工。信息(#启动运河服务器));

n

bsp;      controller = new CanalController(properties);
        controller.start();
        logger.info("## the canal server is running now ......");
        ...
        //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性
        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            MQProperties mqProperties = buildMQProperties(properties);
            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
            canalMQStarter.start(mqProperties, destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }
        ...
        running = true;
    }

  • CanalController是实例调度控制器

public CanalController(final Properties properties){
        // 初始化managerClients用于请求admin
        managerClients = MigrateMap.makeComputingMap(new Function<String, PlainCanalConfigClient>() {
            public PlainCanalConfigClient apply(String managerAddress) {
                return getManagerClient(managerAddress);
            }
        });
        // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml
        initInstanceConfig(properties);
        ...
        // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性
        embededCanalServer = CanalServerWithEmbedded.instance();
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
        int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
        embededCanalServer.setMetricsPort(metricsPort);
        this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
        this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
        embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));
        embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));
        ...
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系统目录
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }
        // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例
        final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
        ServerRunningMonitors.setServerData(serverData);
        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
            ...
        }));
        // 初始化InstanceAction,用于启动和关闭实例
        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {
                ...
            };
            // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance
            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {
                public InstanceConfigMonitor apply(InstanceMode mode) {
                    ...
                }
            });
        }
    }
  • ManagerInstanceConfigMonitor是实例扫描器

public void start() {
        super.start();
        //启动定时任务,定时扫描所有instance
        executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    scan();
                    if (isFirst) {
                        isFirst = false;
                    }
                } catch (Throwable e) {
                    logger.error("scan failed", e);
                }
            }
        }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
    }
private void scan() {
        //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启
        String instances = configClient.findInstances(null);
        final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));
        List<String> start = Lists.newArrayList();
        List<String> stop = Lists.newArrayList();
        List<String> restart = Lists.newArrayList();
        for (String instance : is) {
            if (!configs.containsKey(instance)) {
                PlainCanal newPlainCanal = configClient.findInstance(instance, null);
                if (newPlainCanal != null) {
                    configs.put(instance, newPlainCanal);
                    start.add(instance);
                }
            } else {
                PlainCanal plainCanal = configs.get(instance);
                PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());
                if (newPlainCanal != null) {
                    // 配置有变化
                    restart.add(instance);
                    configs.put(instance, newPlainCanal);
                }
            }
        }
        configs.forEach((instance, plainCanal) -> {
            if (!is.contains(instance)) {
                stop.add(instance);
            }
        });
        stop.forEach(instance -> {
            notifyStop(instance);
        });
        restart.forEach(instance -> {
            notifyReload(instance);
        });
        start.forEach(instance -> {
            notifyStart(instance);
        });
    }
private void notifyStart(String destination) {
        try {
            //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例
            defaultAction.start(destination);
            actions.put(destination, defaultAction);
            // 启动成功后记录配置文件信息
        } catch (Throwable e) {
            logger.error(String.format("scan add found[%s] but start failed", destination), e);
        }
    }
  • ServerRunningMonitor是针对server的running实例控制

public ServerRunningMonitor(){
        // 创建父节点
        dataListener = new IZkDataListener() {
            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }
                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
                    releaseRunning();// 彻底释放mainstem
                }
                activeData = (ServerRunningData) runningData;
            }
            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的状态就是本机,则即时触发一下active抢占
                    initRunning();
                } else {
                    // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作
                    delayExector.schedule(new Runnable() {
                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }
        };
    }
public synchronized void start() {
        super.start();
        try {
            //首先调用listener的processStart方法
            processStart();
            if (zkClient != null) {
                // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
                // 监视/otteradmin/canal/destinations/{0}/running节点变化
                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                zkClient.subscribeDataChanges(path, dataListener);
                
                initRunning();
            } else {
                processActiveEnter();// 没有zk,直接启动
            }
        } catch (Exception e) {
            logger.error("start failed", e);
            // 没有正常启动,重置一下状态,避免干扰下一次start
            stop();
        }
    }
private void processStart() {
        if (listener != null) {
            try {
                //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port
                listener.processStart();
            } catch (Exception e) {
                logger.error("processStart failed", e);
            }
        }
    }
private void initRunning() {
        if (!isStart()) {
            return;
        }
        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            //尝试创建/otteradmin/canal/destinations/{0}/running节点
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例
            processActiveEnter();// 触发一下事件
            mutex.set(true);
            release = false;
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在节点,立即尝试一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
            initRunning();
        }
    }
  • canal.properties配置

canal.register.ip =
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.register.auto = true
canal.admin.register.cluster =

上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/52411.html

(0)

相关推荐

  • 战时管制是指,什么叫战时状态战时措施

    技术战时管制是指,什么叫战时状态战时措施它是指当国家安全受到严重威胁时,将军队处于最高级的备战准备,并向全国发布战争动员令的一种战斗紧张形势战时管制是指。交战国之间的敌对状态。一旦进入战争状态,交战国之间的外交关系、经济

    生活 2021年10月29日
  • 怎么安装BBED

    技术怎么安装BBED这篇文章主要介绍“怎么安装BBED”,在日常操作中,相信很多人在怎么安装BBED问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么安装BBED”的疑惑有所帮助!接下来,

    攻略 2021年11月5日
  • 有志不在年高无志空长百岁,无志空活百岁上一句是什么

    技术有志不在年高无志空长百岁,无志空活百岁上一句是什么有志不在年高有志不在年高无志空长百岁,无志空活百岁出自《甘罗十二为使臣》 【解释】 意思是指只要有远大志向,就算年纪小也没事,一个没有远大志向的人,即便活到很大的岁数

    生活 2021年10月27日
  • 杯弓蛇影的意思是,形容水中倒影的成语有哪些

    技术杯弓蛇影的意思是,形容水中倒影的成语有哪些水中倒影杯弓蛇影的意思是,并不是影子。影子是由于光的直线传播,当光线遇到不透明的物体时,在物体后面形成的黑暗区域。而水中倒影是光的反射现象,当物体射出的光线射到水面上时,被水

    生活 2021年10月27日
  • dexp和dimp工具的使用

    技术dexp和dimp工具的使用 dexp和dimp工具的使用dexp和dimp是达梦数据库逻辑备份还原工具,是数据库自带的两个命令行工具。逻辑备份和逻辑还原都是在联机方式下完成的。dexp和dimp对

    礼包 2021年11月1日
  • 抖音刷赞网,抖音刷赞网站推广永久?

    技术抖音刷赞网,抖音刷赞网站推广永久?抖音点赞、抖音粉丝、抖音评论、抖音播放是怎么刷合适?如今越来越多的人都会玩抖音,因为抖音里面有特别多有趣的内容,很多的用户会去把自己生活当中比较有趣的一些生活经验或者是生活经历发在抖

    测评 2021年11月10日