学习 Flink(九):Async I/O

见文生意,Async I/O 是用于异步读写外部数据源的算子。

流表关联数据库维度表,可以使用 Async I/O 去实现。下图为同步 I/O 和异步 I/O 的对比:

举🌰

定义异步函数:

public abstract class JDBCAsyncFunction<IN, OUT> extends RichAsyncFunction<IN, OUT> {

    /**
     * 数据源
     * */
    private HikariDataSource _datasource;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // 初始化 JDBC 连接池
        if (this._datasource == null) {
            HikariConfig config = new HikariConfig(DB_CONFIG_PATH);
            this._datasource = new HikariDataSource(config);
        }
    }

    @Override
    public void close() throws Exception {
        if (this._threadPool != null) {
            this._threadPool.shutdown();
            this._threadPool = null;
        }

        super.close();
    }

    @Override
    public void asyncInvoke(IN input, ResultFuture<OUT> future) throws Exception {
        CompletableFuture.supplyAsync(() -> {
            List<OUT> r = new LinkedList<>();

            // ①

            return r;
        }, Executors.directExecutor()).thenAccept(future::complete);
    }

    @Override
    public void timeout(IN input, ResultFuture<OUT> future) throws Exception {
        // ②
    }

}

① 执行 SQL 查询,将结果放入列表;

② 超时处理。

执行:

// 无序的
AsyncDataStream.unorderedWait(  
    stream, 
    new JDBCAsyncFunction(),
    1000, TimeUnit.MILLISECONDS, // ①
    100 // ②
);

// 有序的
AsyncDataStream.orderedWait(  
    stream, 
    new JDBCAsyncFunction(), 
    1000, TimeUnit.MILLISECONDS, // ①
    100 // ②
);

① 异步 I/O 的超时时间;

② 异步队列大小,定义了同一时间可以处理异步请求的数量。

参考