这期内容当中小编将会给大家带来有关如何用源码分析运河的部署者模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
CanalLauncher是启动入口类
获取运河物业配置文件
如果运河物业配置文件中属性root.admin.manager有值,那么构造平面配置客户端,调用平面配置客户端的findServer获取平原运河,调用平原运河的获取属性方法获取性能
通过性能构造CanalStarter并调用其开始方法
CanalStarter是启动类
publicsynchronizedvoidstart()
//首先根据canal.serverMode构造CanalMQProducer,如果是卡夫卡,构造的是CanalKafkaProducer
StringserverMode=can控制器。getproperty(properties,CanalConstants .CANAL _ SERVER _ MODE);
if(servermode。equalsignorase('卡夫卡'){ 0
canalMQProducer=new canalkafkaproducer();
} else if(服务器模式。equalSignorCase(' rockeq '){ }
canalMQProducer=new canalrockemqproducer();
}
if(canamqproducer!=null){ 0
//禁用
系统。设置属性(CanaConstants .CANAL _ NORTH _ NETTY,' true ');
//设置为生的避免字节排序-条目的二次解析
系统。设置属性(' canal。实例。记忆。rawentry ',' false ');
}
//接下来构造CanalController并调用其开始方法
伐木工。信息(#启动运河服务器));
n
bsp; controller = new CanalController(properties);
controller.start();
logger.info("## the canal server is running now ......");
...
//构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性
if (canalMQProducer != null) {
canalMQStarter = new CanalMQStarter(canalMQProducer);
MQProperties mqProperties = buildMQProperties(properties);
String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
canalMQStarter.start(mqProperties, destinations);
controller.setCanalMQStarter(canalMQStarter);
}
...
running = true;
}
-
CanalController是实例调度控制器
public CanalController(final Properties properties){ // 初始化managerClients用于请求admin managerClients = MigrateMap.makeComputingMap(new Function<String, PlainCanalConfigClient>() { public PlainCanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml initInstanceConfig(properties); ... // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort(metricsPort); this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); ... final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点 if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系统目录 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例 final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() { ... })); // 初始化InstanceAction,用于启动和关闭实例 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { ... }; // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() { public InstanceConfigMonitor apply(InstanceMode mode) { ... } }); } }
-
ManagerInstanceConfigMonitor是实例扫描器
public void start() { super.start(); //启动定时任务,定时扫描所有instance executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { scan(); if (isFirst) { isFirst = false; } } catch (Throwable e) { logger.error("scan failed", e); } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); } private void scan() { //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启 String instances = configClient.findInstances(null); final List<String> is = Lists.newArrayList(StringUtils.split(instances, ',')); List<String> start = Lists.newArrayList(); List<String> stop = Lists.newArrayList(); List<String> restart = Lists.newArrayList(); for (String instance : is) { if (!configs.containsKey(instance)) { PlainCanal newPlainCanal = configClient.findInstance(instance, null); if (newPlainCanal != null) { configs.put(instance, newPlainCanal); start.add(instance); } } else { PlainCanal plainCanal = configs.get(instance); PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5()); if (newPlainCanal != null) { // 配置有变化 restart.add(instance); configs.put(instance, newPlainCanal); } } } configs.forEach((instance, plainCanal) -> { if (!is.contains(instance)) { stop.add(instance); } }); stop.forEach(instance -> { notifyStop(instance); }); restart.forEach(instance -> { notifyReload(instance); }); start.forEach(instance -> { notifyStart(instance); }); } private void notifyStart(String destination) { try { //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例 defaultAction.start(destination); actions.put(destination, defaultAction); // 启动成功后记录配置文件信息 } catch (Throwable e) { logger.error(String.format("scan add found[%s] but start failed", destination), e); } }
-
ServerRunningMonitor是针对server的running实例控制
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; } public synchronized void start() { super.start(); try { //首先调用listener的processStart方法 processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start // 监视/otteradmin/canal/destinations/{0}/running节点变化 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } } private void processStart() { if (listener != null) { try { //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port listener.processStart(); } catch (Exception e) { logger.error("processStart failed", e); } } } private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //尝试创建/otteradmin/canal/destinations/{0}/running节点 zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例 processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }
-
canal.properties配置
canal.register.ip = canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.admin.register.auto = true canal.admin.register.cluster =
上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/52411.html