ShardingSphere~6

Scaling

这一部分的实现跟之前有显著的不同,先从入口shardingsphere-scaling-bootstrap ScalingServerBootstrap 看下,就是先初始化配置,再启动一个netty服务

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class ScalingServerBootstrap {
    public static void main(final String[] args) {
        // CHECKSTYLE:ON
        log.info("ShardingSphere-Scaling Server Startup");
        ServerConfigurationInitializer.init();
        startScalingServer();
    }
    
    @SneakyThrows(InterruptedException.class)
    private static void startScalingServer() {
        log.info("Start scaling server");
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HttpServerInitializer());
            int port = ScalingContext.getInstance().getServerConfig().getPort();
            Channel channel = bootstrap.bind(port).sync().channel();
            log.info("ShardingSphere-Scaling is server on http://127.0.0.1:{}/", port);
            channel.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

继续跟进,看起来似乎就是解码http请求,再处理下

public final class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(final SocketChannel socketChannel) {
        ChannelPipeline channelPipeline = socketChannel.pipeline();
        channelPipeline.addLast(new HttpServerCodec());
        channelPipeline.addLast(new HttpObjectAggregator(65536));
        channelPipeline.addLast(new HttpServerHandler());
    }
}

进到最后一个handler,就十分清晰了,得到请求后,按照路径对应执行不同的/scaling/job/***

public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
    
    private final ScalingAPI scalingAPI = ScalingAPIFactory.getScalingAPI();
    
    @Override
    protected void channelRead0(final ChannelHandlerContext context, final FullHttpRequest request) {
        String requestPath = request.uri().toLowerCase();
        String requestBody = request.content().toString(CharsetUtil.UTF_8);
        log.info("Http request path: {}", requestPath);
        log.info("Http request body: {}", requestBody);
        if ("/scaling/job/start".equalsIgnoreCase(requestPath) && request.method().equals(HttpMethod.POST)) {
            startJob(context, requestBody);
            return;
        }
        if ("/scaling/job/list".equalsIgnoreCase(requestPath)) {
            listJobs(context);
            return;
        }
        if (requestPath.startsWith("/scaling/job/progress/")) {
            getJobProgress(context, requestPath);
            return;
        }
        if (requestPath.startsWith("/scaling/job/stop/")) {
            stopJob(context, requestPath);
            return;
        }
        if (requestPath.contains("/scaling/job/check/")) {
            checkJob(context, requestPath);
            return;
        }
        if (requestPath.contains("/scaling/job/reset/")) {
            resetJob(context, requestPath);
            return;
        }
        response(ResponseContentUtil.handleBadRequest("Not support request!"), context, HttpResponseStatus.BAD_REQUEST);
    }
    
    private void startJob(final ChannelHandlerContext context, final String requestBody) {
        Optional<Long> jobId = scalingAPI.start(GSON.fromJson(requestBody, JobConfiguration.class));
        if (jobId.isPresent()) {
            response(ResponseContentUtil.build(jobId.get()), context, HttpResponseStatus.OK);
            return;
        }
        response(ResponseContentUtil.handleBadRequest("Invalid scaling job config!"), context, HttpResponseStatus.BAD_REQUEST);
    }
    
    private void listJobs(final ChannelHandlerContext context) {
        List<JobInfo> jobContexts = scalingAPI.list();
        response(ResponseContentUtil.build(jobContexts), context, HttpResponseStatus.OK);
    }
    
    private void getJobProgress(final ChannelHandlerContext context, final String requestPath) {
        try {
            response(ResponseContentUtil.build(scalingAPI.getProgress(getJobId(requestPath))), context, HttpResponseStatus.OK);
        } catch (final ScalingJobNotFoundException ex) {
            response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
        }
    }
    
    private void stopJob(final ChannelHandlerContext context, final String requestPath) {
        scalingAPI.stop(getJobId(requestPath));
        response(ResponseContentUtil.success(), context, HttpResponseStatus.OK);
    }
    
    private void checkJob(final ChannelHandlerContext context, final String requestPath) {
        try {
            response(ResponseContentUtil.build(scalingAPI.dataConsistencyCheck(getJobId(requestPath))), context, HttpResponseStatus.OK);
        } catch (final ScalingJobNotFoundException ex) {
            response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
        }
    }
    
    private void resetJob(final ChannelHandlerContext context, final String requestPath) {
        try {
            scalingAPI.reset(getJobId(requestPath));
            response(ResponseContentUtil.success(), context, HttpResponseStatus.OK);
        } catch (final ScalingJobNotFoundException | SQLException ex) {
            response(ResponseContentUtil.handleBadRequest(ex.getMessage()), context, HttpResponseStatus.BAD_REQUEST);
        }
    }
    
    private long getJobId(final String requestPath) {
        return Long.parseLong(requestPath.split("/")[4]);
    }
    
    private void response(final Object content, final ChannelHandlerContext context, final HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(GSON.toJson(content), CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
        HttpUtil.setContentLength(response, response.content().readableBytes());
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        context.writeAndFlush(response);
    }
    
    @Override
    public void exceptionCaught(final ChannelHandlerContext context, final Throwable cause) {
        log.error("Http request handle occur error:", cause);
        response(ResponseContentUtil.handleException(cause.toString()), context, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        context.close();
    }
}
View Code

具体执行已经脱离了netty应用本身,这里shardingsphere框架封装了一个ScalingAPIFactory

具体看这个类的内容,发现实际执行是依赖了ElasticJob,所有命令都相当于是注册中心协调下的job运行

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ScalingAPIFactory {
    
    /**
     * Get scaling API.
     *
     * @return scaling API
     */
    public static ScalingAPI getScalingAPI() {
        return ScalingAPIHolder.getInstance();
    }
    
    /**
     * Get governance repository API.
     *
     * @return governance repository API
     */
    public static GovernanceRepositoryAPI getGovernanceRepositoryAPI() {
        return GovernanceRepositoryAPIHolder.getInstance();
    }
    
    /**
     * Get job statistics API.
     *
     * @return job statistics API
     */
    public static JobStatisticsAPI getJobStatisticsAPI() {
        return ElasticJobAPIHolder.getInstance().getJobStatisticsAPI();
    }
    
    /**
     * Get job configuration API.
     *
     * @return job configuration API
     */
    public static JobConfigurationAPI getJobConfigurationAPI() {
        return ElasticJobAPIHolder.getInstance().getJobConfigurationAPI();
    }
    
    /**
     * Get job operate API.
     *
     * @return job operate API
     */
    public static JobOperateAPI getJobOperateAPI() {
        return ElasticJobAPIHolder.getInstance().getJobOperateAPI();
    }
    
    /**
     * Get registry center.
     *
     * @return Coordinator registry center
     */
    public static CoordinatorRegistryCenter getRegistryCenter() {
        return RegistryCenterHolder.getInstance();
    }
    
    private static void checkServerConfig() {
        ServerConfiguration serverConfig = ScalingContext.getInstance().getServerConfig();
        Preconditions.checkNotNull(serverConfig, "Scaling server configuration is required.");
        Preconditions.checkNotNull(serverConfig.getGovernanceConfig(), "Governance configuration is required.");
    }
    
    private static final class ScalingAPIHolder {
        
        private static volatile ScalingAPI instance;
        
        public static ScalingAPI getInstance() {
            if (null == instance) {
                synchronized (ScalingAPIFactory.class) {
                    if (null == instance) {
                        checkServerConfig();
                        instance = new ScalingAPIImpl();
                    }
                }
            }
            return instance;
        }
    }
    
    private static final class GovernanceRepositoryAPIHolder {
        
        private static volatile GovernanceRepositoryAPI instance;
        
        static {
            ShardingSphereServiceLoader.register(RegistryCenterRepository.class);
        }
        
        public static GovernanceRepositoryAPI getInstance() {
            if (null == instance) {
                synchronized (ScalingAPIFactory.class) {
                    if (null == instance) {
                        instance = createGovernanceRepositoryAPI();
                    }
                }
            }
            return instance;
        }
        
        private static GovernanceRepositoryAPI createGovernanceRepositoryAPI() {
            checkServerConfig();
            GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
            RegistryCenterConfiguration registryCenterConfig = governanceConfig.getRegistryCenterConfiguration();
            RegistryCenterRepository registryCenterRepository = TypedSPIRegistry.getRegisteredService(RegistryCenterRepository.class, registryCenterConfig.getType(), registryCenterConfig.getProps());
            registryCenterRepository.init(governanceConfig.getName(), registryCenterConfig);
            return new GovernanceRepositoryAPIImpl(registryCenterRepository);
        }
    }
    
    @Getter
    private static final class ElasticJobAPIHolder {
        
        private static volatile ElasticJobAPIHolder instance;
        
        private final JobStatisticsAPI jobStatisticsAPI;
        
        private final JobConfigurationAPI jobConfigurationAPI;
        
        private final JobOperateAPI jobOperateAPI;
        
        private ElasticJobAPIHolder() {
            checkServerConfig();
            GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
            String namespace = governanceConfig.getName() + ScalingConstant.SCALING_ROOT;
            jobStatisticsAPI = JobAPIFactory.createJobStatisticsAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
            jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
            jobOperateAPI = JobAPIFactory.createJobOperateAPI(governanceConfig.getRegistryCenterConfiguration().getServerLists(), namespace, null);
        }
        
        public static ElasticJobAPIHolder getInstance() {
            if (null == instance) {
                synchronized (ScalingAPIFactory.class) {
                    if (null == instance) {
                        instance = new ElasticJobAPIHolder();
                    }
                }
            }
            return instance;
        }
    }
    
    private static final class RegistryCenterHolder {
        
        private static volatile CoordinatorRegistryCenter instance;
        
        public static CoordinatorRegistryCenter getInstance() {
            if (null == instance) {
                synchronized (ScalingAPIFactory.class) {
                    if (null == instance) {
                        instance = createRegistryCenter();
                    }
                }
            }
            return instance;
        }
        
        private static CoordinatorRegistryCenter createRegistryCenter() {
            CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(getZookeeperConfig());
            result.init();
            return result;
        }
        
        private static ZookeeperConfiguration getZookeeperConfig() {
            checkServerConfig();
            GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getGovernanceConfig();
            ZookeeperConfiguration result = new ZookeeperConfiguration(governanceConfig.getRegistryCenterConfiguration().getServerLists(),
                    governanceConfig.getName() + ScalingConstant.SCALING_ROOT);
            Properties props = governanceConfig.getRegistryCenterConfiguration().getProps();
            result.setMaxSleepTimeMilliseconds(getProperty(props, "max.sleep.time.milliseconds", result.getMaxSleepTimeMilliseconds()));
            result.setBaseSleepTimeMilliseconds(getProperty(props, "base.sleep.time.milliseconds", result.getBaseSleepTimeMilliseconds()));
            result.setConnectionTimeoutMilliseconds(getProperty(props, "connection.timeout.milliseconds", result.getConnectionTimeoutMilliseconds()));
            result.setSessionTimeoutMilliseconds(getProperty(props, "session.timeout.milliseconds", result.getSessionTimeoutMilliseconds()));
            return result;
        }
        
        private static int getProperty(final Properties props, final String key, final int defaultValue) {
            return Strings.isNullOrEmpty(props.getProperty(key)) ? defaultValue : Integer.parseInt(props.getProperty(key));
        }
    }
}
View Code

再看下包里的另一个文件 ScalingWorkerBootstrap,也是个有main方法的类,看名字先初始化配置再初始化工作者

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public final class ScalingWorkerBootstrap {
    public static void main(final String[] args) {
        // CHECKSTYLE:ON
        log.info("ShardingSphere-Scaling Worker Startup");
        ServerConfigurationInitializer.init();
        ScalingWorker.init();
        wait0();
    }
    
    private static synchronized void wait0() {
        try {
            ScalingWorkerBootstrap.class.wait();
        } catch (final InterruptedException ignored) {
        }
    }
}

工作者是做什么工作的?通过事件、定时任务、注册中心驱动整个job的执行周期

@Slf4j
public final class ScalingWorker {
    
    private static final ScalingWorker INSTANCE = new ScalingWorker();
    
    private final ScalingAPI scalingAPI = ScalingAPIFactory.getScalingAPI();
    
    /**
     * Init scaling worker.
     */
    public static void init() {
        ShardingSphereEventBus.getInstance().register(INSTANCE);
        new FinishedCheckJobExecutor().start();
        new ScalingJobExecutor().start();
    }
    
    /**
     * Start scaling job.
     *
     * @param event start scaling event.
     */
    @Subscribe
    public void start(final StartScalingEvent event) {
        log.info("Start scaling job by {}", event);
        Optional<Long> jobId = scalingAPI.start(createJobConfig(event));
        if (!jobId.isPresent()) {
            log.info("Switch rule configuration ruleCacheId = {} immediately.", event.getRuleCacheId());
            ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
        }
    }
    
    private JobConfiguration createJobConfig(final StartScalingEvent event) {
        JobConfiguration result = new JobConfiguration();
        result.setRuleConfig(getRuleConfiguration(event));
        result.setHandleConfig(new HandleConfiguration(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId())));
        return result;
    }
    
    private RuleConfiguration getRuleConfiguration(final StartScalingEvent event) {
        RuleConfiguration result = new RuleConfiguration();
        result.setSource(new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()).wrap());
        result.setTarget(new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule()).wrap());
        return result;
    }
}

ScalingJobExecutor 监听注册中心任务,收到通知后做对应的执行动作,相当于通过注册中心解耦了http请求接收和真正的执行处理

@Slf4j
public final class ScalingJobExecutor extends AbstractScalingExecutor {
    
    private static final Pattern CONFIG_PATTERN = Pattern.compile(ScalingConstant.SCALING_ROOT + "/(\d+)/config");
    
    private static final Set<String> EXECUTING_JOBS = Sets.newConcurrentHashSet();
    
    @Override
    public void start() {
        super.start();
        log.info("Start scaling job executor.");
        watchGovernanceRepositoryConfiguration();
    }
    
    private void watchGovernanceRepositoryConfiguration() {
        ScalingAPIFactory.getGovernanceRepositoryAPI().watch(ScalingConstant.SCALING_ROOT, event -> {
            Optional<JobConfigurationPOJO> jobConfigPOJOOptional = getJobConfigPOJO(event);
            if (!jobConfigPOJOOptional.isPresent()) {
                return;
            }
            JobConfigurationPOJO jobConfigPOJO = jobConfigPOJOOptional.get();
            if (DataChangedEvent.Type.DELETED == event.getType() || jobConfigPOJO.isDisabled()) {
                EXECUTING_JOBS.remove(jobConfigPOJO.getJobName());
                JobSchedulerCenter.stop(Long.parseLong(jobConfigPOJO.getJobName()));
                return;
            }
            switch (event.getType()) {
                case ADDED:
                case UPDATED:
                    execute(jobConfigPOJO);
                    break;
                default:
                    break;
            }
        });
    }
    
    private Optional<JobConfigurationPOJO> getJobConfigPOJO(final DataChangedEvent event) {
        try {
            if (CONFIG_PATTERN.matcher(event.getKey()).matches()) {
                log.info("{} job config: {} = {}", event.getType(), event.getKey(), event.getValue());
                return Optional.of(YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class));
            }
            // CHECKSTYLE:OFF
        } catch (final Exception ex) {
            // CHECKSTYLE:ON
            log.error("analyze job config pojo failed.", ex);
        }
        return Optional.empty();
    }
    
    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
        if (EXECUTING_JOBS.add(jobConfigPOJO.getJobName())) {
            new OneOffJobBootstrap(ScalingAPIFactory.getRegistryCenter(), new ScalingJob(), jobConfigPOJO.toJobConfiguration()).execute();
        }
    }
}

具体的扩展操作是分为几个步骤的,官网写的不错,转

准备阶段:在准备阶段,弹性伸缩模块会进行数据源连通性及权限的校验,同时进行存量数据的统计、日志位点的记录,最后根据数据量和用户设置的并行度,对任务进行分片。

存量数据迁移阶段:执行在准备阶段拆分好的存量数据迁移作业,存量迁移阶段采用 JDBC 查询的方式,直接从数据节点中读取数据,并使用新规则写入到新集群中。

增量数据同步阶段:由于存量数据迁移耗费的时间受到数据量和并行度等因素影响,此时需要对这段时间内业务新增的数据进行同步。 不同的数据库使用的技术细节不同,但总体上均为基于复制协议或 WAL 日志实现的变更数据捕获功能。

  • MySQL:订阅并解析 binlog

  • PostgreSQL:采用官方逻辑复制

这些捕获的增量数据,同样会由弹性伸缩模块根据新规则写入到新数据节点中。当增量数据基本同步完成时(由于业务系统未停止,增量数据是不断的),则进入规则切换阶段。

规则切换阶段:在此阶段,可能存在一定时间的业务只读窗口期,通过设置数据库只读或ShardingSphere的熔断机制,让旧数据节点中的数据短暂静态,确保增量同步已完全完成。这个窗口期时间短则数秒,长则数分钟,取决于数据量和用户是否需要对数据进行强校验。 确认完成后,Apache ShardingSphere 可通过配置中心修改配置,将业务导向新规则的集群,弹性伸缩完成。

先不论其他,先来看下MySql Binlog同步相关代码

MySQLClient 这个类整体就是启动一个Netty服务,支持与MYSQL做协议交互,伪装从节点做binlog同步,dump等。。。

@RequiredArgsConstructor
@Slf4j
public final class MySQLClient {
    
    private final ConnectInfo connectInfo;
    
    private EventLoopGroup eventLoopGroup;
    
    private Channel channel;
    
    private Promise<Object> responseCallback;
    
    private final ArrayBlockingQueue<AbstractBinlogEvent> blockingEventQueue = new ArrayBlockingQueue<>(10000);
    
    private ServerInfo serverInfo;
    
    /**
     * Connect to MySQL.
     */
    public synchronized void connect() {
        eventLoopGroup = new NioEventLoopGroup(1);
        responseCallback = new DefaultPromise<>(eventLoopGroup.next());
        channel = new Bootstrap()
                .group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.AUTO_READ, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(final SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
                        socketChannel.pipeline().addLast(new MySQLNegotiatePackageDecoder());
                        socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
                        socketChannel.pipeline().addLast(new MySQLNegotiateHandler(connectInfo.getUsername(), connectInfo.getPassword(), responseCallback));
                        socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
                    }
                }).connect(connectInfo.getHost(), connectInfo.getPort()).channel();
        serverInfo = waitExpectedResponse(ServerInfo.class);
    }
    
    /**
     * Execute command.
     *
     * @param queryString query string
     * @return true if execute successfully, otherwise false
     */
    public synchronized boolean execute(final String queryString) {
        responseCallback = new DefaultPromise<>(eventLoopGroup.next());
        MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString);
        channel.writeAndFlush(comQueryPacket);
        return null != waitExpectedResponse(MySQLOKPacket.class);
    }
    
    /**
     * Execute update.
     *
     * @param queryString query string
     * @return affected rows
     */
    public synchronized int executeUpdate(final String queryString) {
        responseCallback = new DefaultPromise<>(eventLoopGroup.next());
        MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString);
        channel.writeAndFlush(comQueryPacket);
        return (int) Objects.requireNonNull(waitExpectedResponse(MySQLOKPacket.class)).getAffectedRows();
    }
    
    /**
     * Execute query.
     *
     * @param queryString query string
     * @return result set
     */
    public synchronized InternalResultSet executeQuery(final String queryString) {
        responseCallback = new DefaultPromise<>(eventLoopGroup.next());
        MySQLComQueryPacket comQueryPacket = new MySQLComQueryPacket(queryString);
        channel.writeAndFlush(comQueryPacket);
        return waitExpectedResponse(InternalResultSet.class);
    }
    
    /**
     * Start dump binlog.
     *
     * @param binlogFileName binlog file name
     * @param binlogPosition binlog position
     */
    public synchronized void subscribe(final String binlogFileName, final long binlogPosition) {
        initDumpConnectSession();
        registerSlave();
        dumpBinlog(binlogFileName, binlogPosition, queryChecksumLength());
    }
    
    private void initDumpConnectSession() {
        if (serverInfo.getServerVersion().greaterThanOrEqualTo(5, 6, 0)) {
            execute("SET @MASTER_BINLOG_CHECKSUM= @@GLOBAL.BINLOG_CHECKSUM");
        }
    }
    
    private void registerSlave() {
        responseCallback = new DefaultPromise<>(eventLoopGroup.next());
        InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress();
        MySQLComRegisterSlaveCommandPacket packet = new MySQLComRegisterSlaveCommandPacket(
                connectInfo.getServerId(), localAddress.getHostName(), connectInfo.getUsername(), connectInfo.getPassword(), localAddress.getPort());
        channel.writeAndFlush(packet);
        waitExpectedResponse(MySQLOKPacket.class);
    }
    
    private int queryChecksumLength() {
        if (!serverInfo.getServerVersion().greaterThanOrEqualTo(5, 6, 0)) {
            return 0;
        }
        InternalResultSet resultSet = executeQuery("SELECT @@GLOBAL.BINLOG_CHECKSUM");
        String checksumType = resultSet.getFieldValues().get(0).getData().iterator().next().toString();
        switch (checksumType) {
            case "None":
                return 0;
            case "CRC32":
                return 4;
            default:
                throw new UnsupportedOperationException(checksumType);
        }
    }
    
    private void dumpBinlog(final String binlogFileName, final long binlogPosition, final int checksumLength) {
        responseCallback = null;
        channel.pipeline().remove(MySQLCommandPacketDecoder.class);
        channel.pipeline().remove(MySQLCommandResponseHandler.class);
        channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength));
        channel.pipeline().addLast(new MySQLBinlogEventHandler());
        channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
    }
    
    /**
     * Poll binlog event.
     *
     * @return binlog event
     */
    public synchronized AbstractBinlogEvent poll() {
        try {
            return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
        } catch (final InterruptedException ignored) {
            return null;
        }
    }
    
    @SuppressWarnings("unchecked")
    private <T> T waitExpectedResponse(final Class<T> type) {
        try {
            Object response = responseCallback.get();
            if (null == response) {
                return null;
            }
            if (type.equals(response.getClass())) {
                return (T) response;
            }
            if (response instanceof MySQLErrPacket) {
                throw new RuntimeException(((MySQLErrPacket) response).getErrorMessage());
            }
            throw new RuntimeException("unexpected response type");
        } catch (final InterruptedException | ExecutionException ex) {
            throw new RuntimeException(ex);
        }
    }
    
    private final class MySQLCommandResponseHandler extends ChannelInboundHandlerAdapter {
        
        @Override
        public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
            if (null != responseCallback) {
                responseCallback.setSuccess(msg);
            }
        }
        
        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            if (null != responseCallback) {
                responseCallback.setFailure(cause);
                log.error("protocol resolution error", cause);
            }
        }
    }
    
    private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {
        
        private AbstractBinlogEvent lastBinlogEvent;
        
        @Override
        public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
            if (msg instanceof AbstractBinlogEvent) {
                lastBinlogEvent = (AbstractBinlogEvent) msg;
                blockingEventQueue.put(lastBinlogEvent);
            }
        }
        
        @Override
        public void channelInactive(final ChannelHandlerContext ctx) {
            log.warn("channel inactive");
            reconnect();
        }
        
        @Override
        public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
            log.error("protocol resolution error", cause);
            reconnect();
        }
        
        private void reconnect() {
            log.info("reconnect mysql client.");
            closeOldChannel();
            connect();
            subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
        }
        
        private void closeOldChannel() {
            try {
                channel.closeFuture().sync();
            } catch (final InterruptedException ignored) {
            }
        }
    }
}
View Code
原文地址:https://www.cnblogs.com/it-worker365/p/14991976.html