栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkTable HbaseConnector范例

FlinkTable HbaseConnector范例

这个源码也有这个是1.14的,但是1.12要用,还没改connectorname spi不要冲突了

package connector.hbase.flink14.sink;


import connector.hbase.flink14base.options.HbaseWriteOptions;
import connector.hbase.flink14base.sink.HbaseSinkFunction;
import connector.hbase.flink14base.sink.RowDataToMutationConverter;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;


public class HbaseDynamicTableSink implements DynamicTableSink {

    private final String tableName;
    private final HbaseTableSchema hbaseTableSchema;
    private final Configuration hbaseConf;
    private final HbaseWriteOptions writeOptions;
    private final String nullStringLiteral;

    public HbaseDynamicTableSink(
            String tableName,
            HbaseTableSchema hbaseTableSchema,
            Configuration hbaseConf,
            HbaseWriteOptions writeOptions,
            String nullStringLiteral) {

        this.tableName = tableName;
        this.hbaseTableSchema = hbaseTableSchema;
        this.hbaseConf = hbaseConf;
        this.writeOptions = writeOptions;
        this.nullStringLiteral = nullStringLiteral;
    }

    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        HbaseSinkFunction sinkFunction =
                new HbaseSinkFunction<>(
                        tableName,
                        hbaseConf,
                        new RowDataToMutationConverter(hbaseTableSchema, nullStringLiteral),
                        writeOptions.getBufferFlushMaxSizeInBytes(),
                        writeOptions.getBufferFlushMaxRows(),
                        writeOptions.getBufferFlushIntervalMillis());
        return SinkFunctionProvider.of(sinkFunction, writeOptions.getParallelism());
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        // UPSERT mode
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : requestedMode.getContainedKinds()) {
            if (kind != RowKind.UPDATe_BEFORE) {
                builder.addContainedKind(kind);
            }
        }
        return builder.build();
    }

    @Override
    public DynamicTableSink copy() {
        return new HbaseDynamicTableSink(
                tableName, hbaseTableSchema, hbaseConf, writeOptions, nullStringLiteral);
    }

    @Override
    public String asSummaryString() {
        return "Hbase";
    }

    // -------------------------------------------------------------------------------------------

    @VisibleForTesting
    public HbaseTableSchema getHbaseTableSchema() {
        return this.hbaseTableSchema;
    }

    @VisibleForTesting
    public HbaseWriteOptions getWriteOptions() {
        return writeOptions;
    }

    @VisibleForTesting
    public Configuration getConfiguration() {
        return this.hbaseConf;
    }

    @VisibleForTesting
    public String getTableName() {
        return this.tableName;
    }
}


package connector.hbase.flink14.source;

import connector.hbase.flink14base.source.TableInputSplit;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.baseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.util.IOUtils;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public abstract class AbstractTableInputFormat extends RichInputFormat {

    protected static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase2.source.AbstractTableInputFormat.class);
    private static final long serialVersionUID = 1L;

    // helper variable to decide whether the input is exhausted or not
    protected boolean endReached = false;

    protected transient Connection connection = null;
    protected transient Table table = null;
    protected transient RegionLocator regionLocator = null;
    protected transient Scan scan = null;

    
    protected ResultScanner resultScanner = null;

    protected byte[] currentRow;
    protected long scannedRows;

    // Configuration is not serializable
    protected byte[] serializedConfig;

    public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
        serializedConfig = HbaseConfigurationUtil.serializeConfiguration(hConf);
    }

    
    protected abstract void initTable() throws IOException;

    
    protected abstract Scan getScanner();

    
    protected abstract String getTableName();

    
    protected abstract T mapResultToOutType(Result r);

    @Override
    public void configure(Configuration parameters) {}

    protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
        return HbaseConfigurationUtil.deserializeConfiguration(
                serializedConfig, HbaseConfigurationUtil.getHbaseConfiguration());
    }

    
    @Override
    public void open(TableInputSplit split) throws IOException {
        initTable();

        if (split == null) {
            throw new IOException("Input split is null!");
        }

        logSplitInfo("opening", split);

        // set scan range
        currentRow = split.getStartRow();
        scan.setStartRow(currentRow);
        scan.setStopRow(split.getEndRow());

        resultScanner = table.getScanner(scan);
        endReached = false;
        scannedRows = 0;
    }

    @Override
    public T nextRecord(T reuse) throws IOException {
        if (resultScanner == null) {
            throw new IOException("No table result scanner provided!");
        }
        Result res;
        try {
            res = resultScanner.next();
        } catch (Exception e) {
            resultScanner.close();
            // workaround for timeout on scan
            LOG.warn(
                    "Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
            scan.withStartRow(currentRow, false);
            resultScanner = table.getScanner(scan);
            res = resultScanner.next();
        }

        if (res != null) {
            scannedRows++;
            currentRow = res.getRow();
            return mapResultToOutType(res);
        }

        endReached = true;
        return null;
    }

    private void logSplitInfo(String action, TableInputSplit split) {
        int splitId = split.getSplitNumber();
        String splitStart = Bytes.toString(split.getStartRow());
        String splitEnd = Bytes.toString(split.getEndRow());
        String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
        String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
        String[] hostnames = split.getHostnames();
        LOG.info(
                "{} split (this={})[{}|{}|{}|{}]",
                action,
                this,
                splitId,
                hostnames,
                splitStartKey,
                splitStopKey);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return endReached;
    }

    @Override
    public void close() throws IOException {
        LOG.info("Closing split (scanned {} rows)", scannedRows);
        currentRow = null;
        IOUtils.closeQuietly(resultScanner);
        resultScanner = null;
        closeTable();
    }

    public void closeTable() {
        if (table != null) {
            try {
                table.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing Hbase Table.", e);
            }
            table = null;
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing Hbase Connection.", e);
            }
            connection = null;
        }
    }

    @Override
    public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
        try {
            initTable();

            // Get the starting and ending row keys for every region in the currently open table
            final Pair keys = regionLocator.getStartEndKeys();
            if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
                LOG.warn(
                        "Unexpected region keys: {} appeared in Hbase table: {}, all region information are: {}.",
                        keys,
                        table,
                        regionLocator.getAllRegionLocations());
                throw new IOException(
                        "Hbase Table expects at least one region in scan,"
                                + " please check the Hbase table status in Hbase cluster");
            }
            final byte[] startRow = scan.getStartRow();
            final byte[] stopRow = scan.getStopRow();
            final boolean scanWithNoLowerBound = startRow.length == 0;
            final boolean scanWithNoUpperBound = stopRow.length == 0;

            final List splits = new ArrayList<>(minNumSplits);
            for (int i = 0; i < keys.getFirst().length; i++) {
                final byte[] startKey = keys.getFirst()[i];
                final byte[] endKey = keys.getSecond()[i];
                final String regionLocation =
                        regionLocator.getRegionLocation(startKey, false).getHostnamePort();
                // Test if the given region is to be included in the InputSplit while splitting the
                // regions of a table
                if (!includeRegionInScan(startKey, endKey)) {
                    continue;
                }
                // Find the region on which the given row is being served
                final String[] hosts = new String[] {regionLocation};

                // Determine if regions contains keys used by the scan
                boolean isLastRegion = endKey.length == 0;
                if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
                        && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {

                    final byte[] splitStart =
                            scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0
                                    ? startKey
                                    : startRow;
                    final byte[] splitStop =
                            (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
                                            && !isLastRegion
                                    ? endKey
                                    : stopRow;
                    int id = splits.size();
                    final TableInputSplit split =
                            new TableInputSplit(
                                    id, hosts, table.getName().getName(), splitStart, splitStop);
                    splits.add(split);
                }
            }
            LOG.info("Created " + splits.size() + " splits");
            for (TableInputSplit split : splits) {
                logSplitInfo("created", split);
            }
            return splits.toArray(new TableInputSplit[splits.size()]);
        } finally {
            closeTable();
        }
    }

    
    protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
        return true;
    }

    @Override
    public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
        return new LocatableInputSplitAssigner(inputSplits);
    }

    @Override
    public baseStatistics getStatistics(baseStatistics cachedStatistics) {
        return null;
    }

    @VisibleForTesting
    public Connection getConnection() {
        return connection;
    }
}


package connector.hbase.flink14.source;

import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.source.AbstractHbaseDynamicTableSource;
import connector.hbase.flink14base.source.HbaseRowDataLookupFunction;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;

import static org.apache.flink.util.Preconditions.checkArgument;


public class HbaseDynamicTableSource extends AbstractHbaseDynamicTableSource {

    public HbaseDynamicTableSource(
            Configuration conf,
            String tableName,
            HbaseTableSchema hbaseSchema,
            String nullStringLiteral,
            HbaseLookupOptions lookupOptions) {
        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
    }

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        checkArgument(
                context.getKeys().length == 1 && context.getKeys()[0].length == 1,
                "Currently, Hbase table can only be lookup by single rowkey.");
        checkArgument(
                hbaseSchema.getRowKeyName().isPresent(),
                "Hbase schema must have a row key when used in lookup mode.");
        checkArgument(
                hbaseSchema
                        .convertsToTableSchema()
                        .getTableColumn(context.getKeys()[0][0])
                        .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
                        .isPresent(),
                "Currently, Hbase table only supports lookup by rowkey field.");
        if (lookupOptions.getLookupAsync()) {
            return AsyncTableFunctionProvider.of(
                    new HbaseRowDataAsyncLookupFunction(
                            conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions));
        } else {
            return TableFunctionProvider.of(
                    new HbaseRowDataLookupFunction(
                            conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions));
        }
    }

    @Override
    public DynamicTableSource copy() {
        return new HbaseDynamicTableSource(
                conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
    }

    @Override
    protected InputFormat getInputFormat() {
        return new HbaseRowDataInputFormat(conf, tableName, hbaseSchema, nullStringLiteral);
    }

    @VisibleForTesting
    public HbaseLookupOptions getLookupOptions() {
        return this.lookupOptions;
    }
}


package connector.hbase.flink14.source;

import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import connector.hbase.flink14base.util.HbaseSerde;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.*;


public class HbaseRowDataAsyncLookupFunction extends AsyncTableFunction {

    private static final Logger LOG =
            LoggerFactory.getLogger(HbaseRowDataAsyncLookupFunction.class);
    private static final long serialVersionUID = 1L;

    private final String hTableName;
    private final byte[] serializedConfig;
    private final HbaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;

    private transient AsyncConnection asyncConnection;
    private transient AsyncTable table;
    private transient HbaseSerde serde;

    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private transient Cache cache;

    
    private static final int THREAD_POOL_SIZE = 16;

    public HbaseRowDataAsyncLookupFunction(
            Configuration configuration,
            String hTableName,
            HbaseTableSchema hbaseTableSchema,
            String nullStringLiteral,
            HbaseLookupOptions lookupOptions) {
        this.serializedConfig = HbaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = hTableName;
        this.hbaseTableSchema = hbaseTableSchema;
        this.nullStringLiteral = nullStringLiteral;
        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
    }

    @Override
    public void open(FunctionContext context) {
        LOG.info("start open ...");
        final ExecutorService threadPool =
                Executors.newFixedThreadPool(
                        THREAD_POOL_SIZE,
                        new ExecutorThreadFactory(
                                "hbase-async-lookup-worker", Threads.LOGGING_EXCEPTION_HANDLER));
        Configuration config = prepareRuntimeConfiguration();
        CompletableFuture asyncConnectionFuture =
                ConnectionFactory.createAsyncConnection(config);
        try {
            asyncConnection = asyncConnectionFuture.get();
            table = asyncConnection.getTable(TableName.valueOf(hTableName), threadPool);

            this.cache =
                    cacheMaxSize <= 0 || cacheExpireMs <= 0
                            ? null
                            : CacheBuilder.newBuilder()
                                    .recordStats()
                                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                                    .maximumSize(cacheMaxSize)
                                    .build();
            if (cache != null && context != null) {
                context.getMetricGroup()
                        .gauge("lookupCacheHitRate", (Gauge) () -> cache.stats().hitRate());
            }
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Exception while creating connection to Hbase.", e);
            throw new RuntimeException("Cannot create connection to Hbase.", e);
        }
        this.serde = new HbaseSerde(hbaseTableSchema, nullStringLiteral);
        LOG.info("end open.");
    }

    
    public void eval(CompletableFuture> future, Object rowKey) {
        int currentRetry = 0;
        if (cache != null) {
            RowData cacheRowData = cache.getIfPresent(rowKey);
            if (cacheRowData != null) {
                if (cacheRowData.getArity() == 0) {
                    future.complete(Collections.emptyList());
                } else {
                    future.complete(Collections.singletonList(cacheRowData));
                }
                return;
            }
        }
        // fetch result
        fetchResult(future, currentRetry, rowKey);
    }

    
    private void fetchResult(
            CompletableFuture> resultFuture, int currentRetry, Object rowKey) {
        Get get = serde.createGet(rowKey);
        CompletableFuture responseFuture = table.get(get);
        responseFuture.whenCompleteAsync(
                (result, throwable) -> {
                    if (throwable != null) {
                        if (throwable instanceof TableNotFoundException) {
                            LOG.error("Table '{}' not found ", hTableName, throwable);
                            resultFuture.completeExceptionally(
                                    new RuntimeException(
                                            "Hbase table '" + hTableName + "' not found.",
                                            throwable));
                        } else {
                            LOG.error(
                                    String.format(
                                            "Hbase asyncLookup error, retry times = %d",
                                            currentRetry),
                                    throwable);
                            if (currentRetry >= maxRetryTimes) {
                                resultFuture.completeExceptionally(throwable);
                            } else {
                                try {
                                    Thread.sleep(1000 * currentRetry);
                                } catch (InterruptedException e1) {
                                    resultFuture.completeExceptionally(e1);
                                }
                                fetchResult(resultFuture, currentRetry + 1, rowKey);
                            }
                        }
                    } else {
                        if (result.isEmpty()) {
                            resultFuture.complete(Collections.emptyList());
                            if (cache != null) {
                                cache.put(rowKey, new GenericRowData(0));
                            }
                        } else {
                            if (cache != null) {
                                RowData rowData = serde.convertToNewRow(result);
                                resultFuture.complete(Collections.singletonList(rowData));
                                cache.put(rowKey, rowData);
                            } else {
                                resultFuture.complete(
                                        Collections.singletonList(serde.convertToNewRow(result)));
                            }
                        }
                    }
                });
    }

    private Configuration prepareRuntimeConfiguration() {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        // and overwrite configuration using serialized configuration from client-side env
        // (`hbase-site.xml` in classpath).
        // user params from client-side have the highest priority
        Configuration runtimeConfig =
                HbaseConfigurationUtil.deserializeConfiguration(
                        serializedConfig, HbaseConfigurationUtil.getHbaseConfiguration());

        // do validation: check key option(s) in final runtime configuration
        if (StringUtils.isNullOrWhitespaceonly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
            LOG.error(
                    "can not connect to Hbase without {} configuration",
                    HConstants.ZOOKEEPER_QUORUM);
            throw new IllegalArgumentException(
                    "check Hbase configuration failed, lost: '"
                            + HConstants.ZOOKEEPER_QUORUM
                            + "'!");
        }

        return runtimeConfig;
    }

    @Override
    public void close() {
        LOG.info("start close ...");
        if (null != table) {
            table = null;
        }
        if (null != asyncConnection) {
            try {
                asyncConnection.close();
                asyncConnection = null;
            } catch (IOException e) {
                // ignore exception when close.
                LOG.warn("exception when close connection", e);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    public String getHTableName() {
        return hTableName;
    }
}


package connector.hbase.flink14.source;

import connector.hbase.flink14base.util.HbaseSerde;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;


public class HbaseRowDataInputFormat extends AbstractTableInputFormat {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase2.source.HbaseRowDataInputFormat.class);

    private final String tableName;
    private final HbaseTableSchema schema;
    private final String nullStringLiteral;

    private transient HbaseSerde serde;

    public HbaseRowDataInputFormat(
            org.apache.hadoop.conf.Configuration conf,
            String tableName,
            HbaseTableSchema schema,
            String nullStringLiteral) {
        super(conf);
        this.tableName = tableName;
        this.schema = schema;
        this.nullStringLiteral = nullStringLiteral;
    }

    @Override
    protected void initTable() throws IOException {
        this.serde = new HbaseSerde(schema, nullStringLiteral);
        if (table == null) {
            connectToTable();
        }
        if (table != null && scan == null) {
            scan = getScanner();
        }
    }

    @Override
    protected Scan getScanner() {
        return serde.createScan();
    }

    @Override
    public String getTableName() {
        return tableName;
    }

    @Override
    protected RowData mapResultToOutType(Result res) {
        return serde.convertToReusedRow(res);
    }

    private void connectToTable() throws IOException {
        try {
            if (connection == null) {
                connection = ConnectionFactory.createConnection(getHadoopConfiguration());
            }
            TableName name = TableName.valueOf(getTableName());
            table = connection.getTable(name);
            regionLocator = connection.getRegionLocator(name);
        } catch (TableNotFoundException tnfe) {
            LOG.error("The table " + tableName + " not found ", tnfe);
            throw new RuntimeException("Hbase table '" + tableName + "' not found.", tnfe);
        }
    }
}


package connector.hbase.flink14;


import connector.hbase.flink14.sink.HbaseDynamicTableSink;
import connector.hbase.flink14.source.HbaseDynamicTableSource;
import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.options.HbaseWriteOptions;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.conf.Configuration;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static connector.hbase.flink14base.table.HbaseConnectorOptions.*;
import static connector.hbase.flink14base.table.HbaseConnectorOptionsUtil.*;
import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;


@Internal
public class Hbase2DynamicTableFactory
        implements DynamicTableSourceFactory, DynamicTableSinkFactory {

    private static final String IDENTIFIER = "hbase-2.2";

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);

        final ReadableConfig tableOptions = helper.getOptions();

        TableSchema tableSchema = context.getCatalogTable().getSchema();
        Map options = context.getCatalogTable().getOptions();

        validatePrimaryKey(tableSchema);

        String tableName = tableOptions.get(TABLE_NAME);
        Configuration hbaseConf = getHbaseConfiguration(options);
        HbaseLookupOptions lookupOptions = getHbaseLookupOptions(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HbaseTableSchema hbaseSchema = HbaseTableSchema.fromTableSchema(tableSchema);

        return new HbaseDynamicTableSource(
                hbaseConf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);

        final ReadableConfig tableOptions = helper.getOptions();

        TableSchema tableSchema = context.getCatalogTable().getSchema();
        Map options = context.getCatalogTable().getOptions();

        validatePrimaryKey(tableSchema);

        String tableName = tableOptions.get(TABLE_NAME);
        Configuration hbaseConf = getHbaseConfiguration(options);
        HbaseWriteOptions hbaseWriteOptions = getHbaseWriteOptions(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HbaseTableSchema hbaseSchema = HbaseTableSchema.fromTableSchema(tableSchema);

        return new HbaseDynamicTableSink(
                tableName, hbaseSchema, hbaseConf, hbaseWriteOptions, nullStringLiteral);
    }

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set> requiredOptions() {
        Set> set = new HashSet<>();
        set.add(TABLE_NAME);
        return set;
    }

    @Override
    public Set> optionalOptions() {
        Set> set = new HashSet<>();
        set.add(ZOOKEEPER_ZNODE_PARENT);
        set.add(ZOOKEEPER_QUORUM);
        set.add(NULL_STRING_LITERAL);
        set.add(SINK_BUFFER_FLUSH_MAX_SIZE);
        set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
        set.add(SINK_BUFFER_FLUSH_INTERVAL);
        set.add(SINK_PARALLELISM);
        set.add(LOOKUP_ASYNC);
        set.add(LOOKUP_CACHE_MAX_ROWS);
        set.add(LOOKUP_CACHE_TTL);
        set.add(LOOKUP_MAX_RETRIES);
        return set;
    }
}


package connector.hbase.flink14base.options;

import java.io.Serializable;
import java.util.Objects;


public class HbaseLookupOptions implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final int DEFAULT_MAX_RETRY_TIMES = 3;

    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private final boolean lookupAsync;

    public HbaseLookupOptions(
            long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean lookupAsync) {
        this.cacheMaxSize = cacheMaxSize;
        this.cacheExpireMs = cacheExpireMs;
        this.maxRetryTimes = maxRetryTimes;
        this.lookupAsync = lookupAsync;
    }

    public long getCacheMaxSize() {
        return cacheMaxSize;
    }

    public long getCacheExpireMs() {
        return cacheExpireMs;
    }

    public int getMaxRetryTimes() {
        return maxRetryTimes;
    }

    public boolean getLookupAsync() {
        return lookupAsync;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof HbaseLookupOptions) {
            HbaseLookupOptions options = (HbaseLookupOptions) o;
            return Objects.equals(cacheMaxSize, options.cacheMaxSize)
                    && Objects.equals(cacheExpireMs, options.cacheExpireMs)
                    && Objects.equals(maxRetryTimes, options.maxRetryTimes)
                    && Objects.equals(lookupAsync, options.lookupAsync);
        } else {
            return false;
        }
    }

    
    public static class Builder {
        private long cacheMaxSize = -1L;
        private long cacheExpireMs = 0L;
        private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
        private boolean lookupAsync = false;

        
        public Builder setCacheMaxSize(long cacheMaxSize) {
            this.cacheMaxSize = cacheMaxSize;
            return this;
        }

        
        public Builder setCacheExpireMs(long cacheExpireMs) {
            this.cacheExpireMs = cacheExpireMs;
            return this;
        }

        
        public Builder setMaxRetryTimes(int maxRetryTimes) {
            this.maxRetryTimes = maxRetryTimes;
            return this;
        }

        
        public Builder setLookupAsync(boolean lookupAsync) {
            this.lookupAsync = lookupAsync;
            return this;
        }

        public HbaseLookupOptions build() {
            return new HbaseLookupOptions(cacheMaxSize, cacheExpireMs, maxRetryTimes, lookupAsync);
        }
    }
}


package connector.hbase.flink14base.options;

import org.apache.hadoop.hbase.client.ConnectionConfiguration;

import java.io.Serializable;
import java.util.Objects;


public class HbaseWriteOptions implements Serializable {

    private static final long serialVersionUID = 1L;

    private final long bufferFlushMaxSizeInBytes;
    private final long bufferFlushMaxRows;
    private final long bufferFlushIntervalMillis;
    private final Integer parallelism;

    private HbaseWriteOptions(
            long bufferFlushMaxSizeInBytes,
            long bufferFlushMaxMutations,
            long bufferFlushIntervalMillis,
            Integer parallelism) {
        this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
        this.bufferFlushMaxRows = bufferFlushMaxMutations;
        this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
        this.parallelism = parallelism;
    }

    public long getBufferFlushMaxSizeInBytes() {
        return bufferFlushMaxSizeInBytes;
    }

    public long getBufferFlushMaxRows() {
        return bufferFlushMaxRows;
    }

    public long getBufferFlushIntervalMillis() {
        return bufferFlushIntervalMillis;
    }

    public Integer getParallelism() {
        return parallelism;
    }

    @Override
    public String toString() {
        return "HbaseWriteOptions{"
                + "bufferFlushMaxSizeInBytes="
                + bufferFlushMaxSizeInBytes
                + ", bufferFlushMaxRows="
                + bufferFlushMaxRows
                + ", bufferFlushIntervalMillis="
                + bufferFlushIntervalMillis
                + ", parallelism="
                + parallelism
                + '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        HbaseWriteOptions that = (HbaseWriteOptions) o;
        return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes
                && bufferFlushMaxRows == that.bufferFlushMaxRows
                && bufferFlushIntervalMillis == that.bufferFlushIntervalMillis
                && parallelism == that.parallelism;
    }

    @Override
    public int hashCode() {
        return Objects.hash(
                bufferFlushMaxSizeInBytes,
                bufferFlushMaxRows,
                bufferFlushIntervalMillis,
                parallelism);
    }

    
    public static Builder builder() {
        return new Builder();
    }

    
    public static class Builder {

        private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
        private long bufferFlushMaxRows = 0;
        private long bufferFlushIntervalMillis = 0;
        private Integer parallelism;

        
        public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes) {
            this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
            return this;
        }

        
        public Builder setBufferFlushMaxRows(long bufferFlushMaxRows) {
            this.bufferFlushMaxRows = bufferFlushMaxRows;
            return this;
        }

        
        public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis) {
            this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
            return this;
        }

        
        public Builder setParallelism(Integer parallelism) {
            this.parallelism = parallelism;
            return this;
        }

        
        public HbaseWriteOptions build() {
            return new HbaseWriteOptions(
                    bufferFlushMaxSizeInBytes,
                    bufferFlushMaxRows,
                    bufferFlushIntervalMillis,
                    parallelism);
        }
    }
}


package connector.hbase.flink14base.sink;

import org.apache.flink.annotation.Internal;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;

import java.io.Serializable;


@Internal
public interface HbaseMutationConverter extends Serializable {

    
    void open();

    
    Mutation convertToMutation(T record);
}


package connector.hbase.flink14base.sink;

import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;


@Internal
public class HbaseSinkFunction extends RichSinkFunction
        implements CheckpointedFunction, BufferedMutator.ExceptionListener {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase.sink.HbaseSinkFunction.class);

    private final String hTableName;
    private final byte[] serializedConfig;

    private final long bufferFlushMaxSizeInBytes;
    private final long bufferFlushMaxMutations;
    private final long bufferFlushIntervalMillis;
    private final HbaseMutationConverter mutationConverter;

    private transient Connection connection;
    private transient BufferedMutator mutator;

    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;
    private transient AtomicLong numPendingRequests;

    private transient volatile boolean closed = false;

    
    private final AtomicReference failureThrowable = new AtomicReference<>();

    public HbaseSinkFunction(
            String hTableName,
            org.apache.hadoop.conf.Configuration conf,
            HbaseMutationConverter mutationConverter,
            long bufferFlushMaxSizeInBytes,
            long bufferFlushMaxMutations,
            long bufferFlushIntervalMillis) {
        this.hTableName = hTableName;
        // Configuration is not serializable
        this.serializedConfig = HbaseConfigurationUtil.serializeConfiguration(conf);
        this.mutationConverter = mutationConverter;
        this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
        this.bufferFlushMaxMutations = bufferFlushMaxMutations;
        this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.info("start open ...");
        org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
        try {
            this.mutationConverter.open();
            this.numPendingRequests = new AtomicLong(0);

            if (null == connection) {
                this.connection = ConnectionFactory.createConnection(config);
            }
            // create a parameter instance, set the table name and custom listener reference.
            BufferedMutatorParams params =
                    new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
            if (bufferFlushMaxSizeInBytes > 0) {
                params.writeBufferSize(bufferFlushMaxSizeInBytes);
            }
            this.mutator = connection.getBufferedMutator(params);

            if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
                this.executor =
                        Executors.newScheduledThreadPool(
                                1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture =
                        this.executor.scheduleWithFixedDelay(
                                () -> {
                                    if (closed) {
                                        return;
                                    }
                                    try {
                                        flush();
                                    } catch (Exception e) {
                                        // fail the sink and skip the rest of the items
                                        // if the failure handler decides to throw an exception
                                        failureThrowable.compareAndSet(null, e);
                                    }
                                },
                                bufferFlushIntervalMillis,
                                bufferFlushIntervalMillis,
                                TimeUnit.MILLISECONDS);
            }
        } catch (TableNotFoundException tnfe) {
            LOG.error("The table " + hTableName + " not found ", tnfe);
            throw new RuntimeException("Hbase table '" + hTableName + "' not found.", tnfe);
        } catch (IOException ioe) {
            LOG.error("Exception while creating connection to Hbase.", ioe);
            throw new RuntimeException("Cannot create connection to Hbase.", ioe);
        }
        LOG.info("end open.");
    }

    private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        // and overwrite configuration using serialized configuration from client-side env
        // (`hbase-site.xml` in classpath).
        // user params from client-side have the highest priority
        org.apache.hadoop.conf.Configuration runtimeConfig =
                HbaseConfigurationUtil.deserializeConfiguration(
                        serializedConfig, HbaseConfigurationUtil.getHbaseConfiguration());

        // do validation: check key option(s) in final runtime configuration
        if (StringUtils.isNullOrWhitespaceonly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
            LOG.error(
                    "Can not connect to Hbase without {} configuration",
                    HConstants.ZOOKEEPER_QUORUM);
            throw new IOException(
                    "Check Hbase configuration failed, lost: '"
                            + HConstants.ZOOKEEPER_QUORUM
                            + "'!");
        }

        return runtimeConfig;
    }

    private void checkErrorAndRethrow() {
        Throwable cause = failureThrowable.get();
        if (cause != null) {
            throw new RuntimeException("An error occurred in HbaseSink.", cause);
        }
    }

    @SuppressWarnings("rawtypes")
    @Override
    public void invoke(T value, Context context) throws Exception {
        checkErrorAndRethrow();

        mutator.mutate(mutationConverter.convertToMutation(value));

        // flush when the buffer number of mutations greater than the configured max size.
        if (bufferFlushMaxMutations > 0
                && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
            flush();
        }
    }

    private void flush() throws IOException {
        // BufferedMutator is thread-safe
        mutator.flush();
        numPendingRequests.set(0);
        checkErrorAndRethrow();
    }

    @Override
    public void close() throws Exception {
        closed = true;

        if (mutator != null) {
            try {
                mutator.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing Hbase BufferedMutator.", e);
            }
            this.mutator = null;
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing Hbase Connection.", e);
            }
            this.connection = null;
        }

        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
            if (executor != null) {
                executor.shutdownNow();
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        while (numPendingRequests.get() != 0) {
            flush();
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // nothing to do.
    }

    @Override
    public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator)
            throws RetriesExhaustedWithDetailsException {
        // fail the sink and skip the rest of the items
        // if the failure handler decides to throw an exception
        failureThrowable.compareAndSet(null, exception);
    }
}


package connector.hbase.flink14base.sink;


import connector.hbase.flink14base.util.HbaseSerde;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.hbase.client.Mutation;


public class RowDataToMutationConverter implements HbaseMutationConverter {
    private static final long serialVersionUID = 1L;

    private final HbaseTableSchema schema;
    private final String nullStringLiteral;
    private transient HbaseSerde serde;

    public RowDataToMutationConverter(HbaseTableSchema schema, final String nullStringLiteral) {
        this.schema = schema;
        this.nullStringLiteral = nullStringLiteral;
    }

    @Override
    public void open() {
        this.serde = new HbaseSerde(schema, nullStringLiteral);
    }

    @Override
    public Mutation convertToMutation(RowData record) {
        RowKind kind = record.getRowKind();
        if (kind == RowKind.INSERT || kind == RowKind.UPDATe_AFTER) {
            return serde.createPutMutation(record);
        } else {
            return serde.createDeleteMutation(record);
        }
    }
}


package connector.hbase.flink14base.source;

import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.conf.Configuration;

import static org.apache.flink.util.Preconditions.checkArgument;


@Internal
public abstract class AbstractHbaseDynamicTableSource
        implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {

    protected final Configuration conf;
    protected final String tableName;
    protected HbaseTableSchema hbaseSchema;
    protected final String nullStringLiteral;
    protected final HbaseLookupOptions lookupOptions;

    public AbstractHbaseDynamicTableSource(
            Configuration conf,
            String tableName,
            HbaseTableSchema hbaseSchema,
            String nullStringLiteral,
            HbaseLookupOptions lookupOptions) {
        this.conf = conf;
        this.tableName = tableName;
        this.hbaseSchema = hbaseSchema;
        this.nullStringLiteral = nullStringLiteral;
        this.lookupOptions = lookupOptions;
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        return InputFormatProvider.of(getInputFormat());
    }

    protected abstract InputFormat getInputFormat();

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        checkArgument(
                context.getKeys().length == 1 && context.getKeys()[0].length == 1,
                "Currently, Hbase table can only be lookup by single rowkey.");
        checkArgument(
                hbaseSchema.getRowKeyName().isPresent(),
                "Hbase schema must have a row key when used in lookup mode.");
        checkArgument(
                hbaseSchema
                        .convertsToTableSchema()
                        .getTableColumn(context.getKeys()[0][0])
                        .filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
                        .isPresent(),
                "Currently, Hbase table only supports lookup by rowkey field.");

        return TableFunctionProvider.of(
                new HbaseRowDataLookupFunction(
                        conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions));
    }

    @Override
    public boolean supportsNestedProjection() {
        // planner doesn't support nested projection push down yet.
        return false;
    }

    @Override
    public void applyProjection(int[][] projectedFields) {
        TableSchema projectSchema =
                TableSchemaUtils.projectSchema(
                        hbaseSchema.convertsToTableSchema(), projectedFields);
        this.hbaseSchema = HbaseTableSchema.fromTableSchema(projectSchema);
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.insertonly();
    }

    @Override
    public String asSummaryString() {
        return "Hbase";
    }

    // -------------------------------------------------------------------------------------------

    @VisibleForTesting
    public HbaseTableSchema getHbaseTableSchema() {
        return this.hbaseSchema;
    }
}


package connector.hbase.flink14base.source;

import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import connector.hbase.flink14base.util.HbaseSerde;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


@Internal
public class HbaseRowDataLookupFunction extends TableFunction {

    private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase.source.HbaseRowDataLookupFunction.class);
    private static final long serialVersionUID = 1L;

    private final String hTableName;
    private final byte[] serializedConfig;
    private final HbaseTableSchema hbaseTableSchema;
    private final String nullStringLiteral;

    private transient Connection hConnection;
    private transient HTable table;
    private transient HbaseSerde serde;

    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private transient Cache cache;

    public HbaseRowDataLookupFunction(
            Configuration configuration,
            String hTableName,
            HbaseTableSchema hbaseTableSchema,
            String nullStringLiteral,
            HbaseLookupOptions lookupOptions) {
        this.serializedConfig = HbaseConfigurationUtil.serializeConfiguration(configuration);
        this.hTableName = hTableName;
        this.hbaseTableSchema = hbaseTableSchema;
        this.nullStringLiteral = nullStringLiteral;
        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
    }

    
    public void eval(Object rowKey) throws IOException {
        if (cache != null) {
            RowData cacheRowData = cache.getIfPresent(rowKey);
            if (cacheRowData != null) {
                collect(cacheRowData);
                return;
            }
        }
        for (int retry = 0; retry <= maxRetryTimes; retry++) {
            try {
                // fetch result
                Get get = serde.createGet(rowKey);
                if (get != null) {
                    Result result = table.get(get);
                    if (!result.isEmpty()) {
                        if (cache != null) {
                            // parse and collect
                            RowData rowData = serde.convertToNewRow(result);
                            collect(rowData);
                            cache.put(rowKey, rowData);
                        } else {
                            collect(serde.convertToReusedRow(result));
                        }
                    }
                }
                break;
            } catch (IOException e) {
                LOG.error(String.format("Hbase lookup error, retry times = %d", retry), e);
                if (retry >= maxRetryTimes) {
                    throw new RuntimeException("Execution of Hbase lookup failed.", e);
                }
                try {
                    Thread.sleep(1000 * retry);
                } catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
    }

    private Configuration prepareRuntimeConfiguration() {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        // and overwrite configuration using serialized configuration from client-side env
        // (`hbase-site.xml` in classpath).
        // user params from client-side have the highest priority
        Configuration runtimeConfig =
                HbaseConfigurationUtil.deserializeConfiguration(
                        serializedConfig, HbaseConfigurationUtil.getHbaseConfiguration());

        // do validation: check key option(s) in final runtime configuration
        if (StringUtils.isNullOrWhitespaceonly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
            LOG.error(
                    "can not connect to Hbase without {} configuration",
                    HConstants.ZOOKEEPER_QUORUM);
            throw new IllegalArgumentException(
                    "check Hbase configuration failed, lost: '"
                            + HConstants.ZOOKEEPER_QUORUM
                            + "'!");
        }

        return runtimeConfig;
    }

    @Override
    public void open(FunctionContext context) {
        LOG.info("start open ...");
        Configuration config = prepareRuntimeConfiguration();
        try {
            hConnection = ConnectionFactory.createConnection(config);
            table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
            this.cache =
                    cacheMaxSize <= 0 || cacheExpireMs <= 0
                            ? null
                            : CacheBuilder.newBuilder()
                                    .recordStats()
                                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                                    .maximumSize(cacheMaxSize)
                                    .build();
            if (cache != null) {
                context.getMetricGroup()
                        .gauge("lookupCacheHitRate", (Gauge) () -> cache.stats().hitRate());
            }
        } catch (TableNotFoundException tnfe) {
            LOG.error("Table '{}' not found ", hTableName, tnfe);
            throw new RuntimeException("Hbase table '" + hTableName + "' not found.", tnfe);
        } catch (IOException ioe) {
            LOG.error("Exception while creating connection to Hbase.", ioe);
            throw new RuntimeException("Cannot create connection to Hbase.", ioe);
        }
        this.serde = new HbaseSerde(hbaseTableSchema, nullStringLiteral);
        LOG.info("end open.");
    }

    @Override
    public void close() {
        LOG.info("start close ...");
        if (null != table) {
            try {
                table.close();
                table = null;
            } catch (IOException e) {
                // ignore exception when close.
                LOG.warn("exception when close table", e);
            }
        }
        if (null != hConnection) {
            try {
                hConnection.close();
                hConnection = null;
            } catch (IOException e) {
                // ignore exception when close.
                LOG.warn("exception when close connection", e);
            }
        }
        LOG.info("end close.");
    }

    @VisibleForTesting
    public String getHTableName() {
        return hTableName;
    }
}


package connector.hbase.flink14base.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.LocatableInputSplit;


@Internal
public class TableInputSplit extends LocatableInputSplit {

    private static final long serialVersionUID = 1L;

    
    private final byte[] tableName;

    
    private final byte[] startRow;

    
    private final byte[] endRow;

    
    public TableInputSplit(
            final int splitNumber,
            final String[] hostnames,
            final byte[] tableName,
            final byte[] startRow,
            final byte[] endRow) {
        super(splitNumber, hostnames);

        this.tableName = tableName;
        this.startRow = startRow;
        this.endRow = endRow;
    }

    
    public byte[] getTableName() {
        return this.tableName;
    }

    
    public byte[] getStartRow() {
        return this.startRow;
    }

    
    public byte[] getEndRow() {
        return this.endRow;
    }
}


package connector.hbase.flink14base.table;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.factories.FactoryUtil;

import java.time.Duration;


@PublicEvolving
public class HbaseConnectorOptions {

    public static final ConfigOption TABLE_NAME =
            ConfigOptions.key("table-name")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("The name of Hbase table to connect.");

    public static final ConfigOption ZOOKEEPER_QUORUM =
            ConfigOptions.key("zookeeper.quorum")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("The Hbase Zookeeper quorum.");

    public static final ConfigOption ZOOKEEPER_ZNODE_PARENT =
            ConfigOptions.key("zookeeper.znode.parent")
                    .stringType()
                    .defaultValue("/hbase")
                    .withDescription("The root dir in Zookeeper for Hbase cluster.");

    public static final ConfigOption NULL_STRING_LITERAL =
            ConfigOptions.key("null-string-literal")
                    .stringType()
                    .defaultValue("null")
                    .withDescription(
                            "Representation for null values for string fields. Hbase source and "
                                    + "sink encodes/decodes empty bytes as null values for all types except string type.");

    public static final ConfigOption SINK_BUFFER_FLUSH_MAX_SIZE =
            ConfigOptions.key("sink.buffer-flush.max-size")
                    .memoryType()
                    .defaultValue(MemorySize.parse("2mb"))
                    .withDescription(
                            "Writing option, maximum size in memory of buffered rows for each "
                                    + "writing request. This can improve performance for writing data to Hbase database, "
                                    + "but may increase the latency. Can be set to '0' to disable it. ");

    public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS =
            ConfigOptions.key("sink.buffer-flush.max-rows")
                    .intType()
                    .defaultValue(1000)
                    .withDescription(
                            "Writing option, maximum number of rows to buffer for each writing request. "
                                    + "This can improve performance for writing data to Hbase database, but may increase the latency. "
                                    + "Can be set to '0' to disable it.");

    public static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL =
            ConfigOptions.key("sink.buffer-flush.interval")
                    .durationType()
                    .defaultValue(Duration.ofSeconds(1))
                    .withDescription(
                            "Writing option, the interval to flush any buffered rows. "
                                    + "This can improve performance for writing data to Hbase database, but may increase the latency. "
                                    + "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                    + "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");

    public static final ConfigOption LOOKUP_ASYNC =
            ConfigOptions.key("lookup.async")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription("whether to set async lookup.");

    public static final ConfigOption LOOKUP_CACHE_MAX_ROWS =
            ConfigOptions.key("lookup.cache.max-rows")
                    .longType()
                    .defaultValue(-1L)
                    .withDescription(
                            "the max number of rows of lookup cache, over this value, the oldest rows will "
                                    + "be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any of them is "
                                    + "specified. Cache is not enabled as default.");

    public static final ConfigOption LOOKUP_CACHE_TTL =
            ConfigOptions.key("lookup.cache.ttl")
                    .durationType()
                    .defaultValue(Duration.ofSeconds(0))
                    .withDescription("the cache time to live.");

    public static final ConfigOption LOOKUP_MAX_RETRIES =
            ConfigOptions.key("lookup.max-retries")
                    .intType()
                    .defaultValue(3)
                    .withDescription("the max retry times if lookup database failed.");

    public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

    private HbaseConnectorOptions() {}
}


package connector.hbase.flink14base.table;

import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.options.HbaseWriteOptions;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;

import java.util.Map;
import java.util.Properties;

import static connector.hbase.flink14base.table.HbaseConnectorOptions.*;



public class HbaseConnectorOptionsUtil {

    
    public static final String PROPERTIES_PREFIX = "properties.";

    // --------------------------------------------------------------------------------------------
    // Validation
    // --------------------------------------------------------------------------------------------

    
    public static void validatePrimaryKey(TableSchema schema) {
        HbaseTableSchema hbaseSchema = HbaseTableSchema.fromTableSchema(schema);
        if (!hbaseSchema.getRowKeyName().isPresent()) {
            throw new IllegalArgumentException(
                    "Hbase table requires to define a row key field. "
                            + "A row key field is defined as an atomic type, "
                            + "column families and qualifiers are defined as ROW type.");
        }
        schema.getPrimaryKey()
                .ifPresent(
                        k -> {
                            if (k.getColumns().size() > 1) {
                                throw new IllegalArgumentException(
                                        "Hbase table doesn't support a primary Key on multiple columns. "
                                                + "The primary key of Hbase table must be defined on row key field.");
                            }
                            if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
                                throw new IllegalArgumentException(
                                        "Primary key of Hbase table must be defined on the row key field. "
                                                + "A row key field is defined as an atomic type, "
                                                + "column families and qualifiers are defined as ROW type.");
                            }
                        });
    }

    public static HbaseWriteOptions getHbaseWriteOptions(ReadableConfig tableOptions) {
        HbaseWriteOptions.Builder builder = HbaseWriteOptions.builder();
        builder.setBufferFlushIntervalMillis(
                tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
        builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS));
        builder.setBufferFlushMaxSizeInBytes(
                tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
        builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
        return builder.build();
    }

    public static HbaseLookupOptions getHbaseLookupOptions(ReadableConfig tableOptions) {
        HbaseLookupOptions.Builder builder = HbaseLookupOptions.builder();
        builder.setLookupAsync(tableOptions.get(HbaseConnectorOptions.LOOKUP_ASYNC));
        builder.setMaxRetryTimes(tableOptions.get(HbaseConnectorOptions.LOOKUP_MAX_RETRIES));
        builder.setCacheExpireMs(
                tableOptions.get(HbaseConnectorOptions.LOOKUP_CACHE_TTL).toMillis());
        builder.setCacheMaxSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS));
        return builder.build();
    }

    
    public static Configuration getHbaseConfiguration(Map options) {
        org.apache.flink.configuration.Configuration tableOptions =
                org.apache.flink.configuration.Configuration.fromMap(options);
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        Configuration hbaseClientConf = HbaseConfigurationUtil.getHbaseConfiguration();
        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.getString(ZOOKEEPER_QUORUM));
        hbaseClientConf.set(
                HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.getString(ZOOKEEPER_ZNODE_PARENT));
        // add Hbase properties
        final Properties properties = getHbaseClientProperties(options);
        properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), v.toString()));
        return hbaseClientConf;
    }

    private static Properties getHbaseClientProperties(Map tableOptions) {
        final Properties hbaseProperties = new Properties();

        if (containsHbaseClientProperties(tableOptions)) {
            tableOptions.keySet().stream()
                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
                    .forEach(
                            key -> {
                                final String value = tableOptions.get(key);
                                final String subKey = key.substring((PROPERTIES_PREFIX).length());
                                hbaseProperties.put(subKey, value);
                            });
        }
        return hbaseProperties;
    }

    
    private static boolean containsHbaseClientProperties(Map tableOptions) {
        return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
    }

    private HbaseConnectorOptionsUtil() {}
}


package connector.hbase.flink14base.util;

import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;


public class HbaseConfigurationUtil {

    private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase.util.HbaseConfigurationUtil.class);

    public static final String ENV_Hbase_CONF_DIR = "Hbase_CONF_DIR";

    public static Configuration getHbaseConfiguration() {

        // Instantiate an HbaseConfiguration to load the hbase-default.xml and hbase-site.xml from
        // the classpath.
        Configuration result = HbaseConfiguration.create();
        boolean foundHbaseConfiguration = false;

        // We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration
        // The properties of a newly added resource will override the ones in previous resources, so
        // a configuration
        // file with higher priority should be added later.

        // Approach 1: Hbase_HOME environment variables
        String possibleHbaseConfPath = null;

        final String hbaseHome = System.getenv("Hbase_HOME");
        if (hbaseHome != null) {
            LOG.debug("Searching Hbase configuration files in Hbase_HOME: {}", hbaseHome);
            possibleHbaseConfPath = hbaseHome + "/conf";
        }

        if (possibleHbaseConfPath != null) {
            foundHbaseConfiguration = addHbaseConfIfFound(result, possibleHbaseConfPath);
        }

        // Approach 2: Hbase_CONF_DIR environment variable
        String hbaseConfDir = System.getenv("Hbase_CONF_DIR");
        if (hbaseConfDir != null) {
            LOG.debug("Searching Hbase configuration files in Hbase_CONF_DIR: {}", hbaseConfDir);
            foundHbaseConfiguration =
                    addHbaseConfIfFound(result, hbaseConfDir) || foundHbaseConfiguration;
        }

        if (!foundHbaseConfiguration) {
            LOG.warn(
                    "Could not find Hbase configuration via any of the supported methods "
                            + "(Flink configuration, environment variables).");
        }

        return result;
    }

    
    private static boolean addHbaseConfIfFound(
            Configuration configuration, String possibleHbaseConfPath) {
        boolean foundHbaseConfiguration = false;
        if (new File(possibleHbaseConfPath).exists()) {
            if (new File(possibleHbaseConfPath + "/hbase-default.xml").exists()) {
                configuration.addResource(
                        new org.apache.hadoop.fs.Path(
                                possibleHbaseConfPath + "/hbase-default.xml"));
                LOG.debug(
                        "Adding "
                                + possibleHbaseConfPath
                                + "/hbase-default.xml to hbase configuration");
                foundHbaseConfiguration = true;
            }
            if (new File(possibleHbaseConfPath + "/hbase-site.xml").exists()) {
                configuration.addResource(
                        new org.apache.hadoop.fs.Path(possibleHbaseConfPath + "/hbase-site.xml"));
                LOG.debug(
                        "Adding "
                                + possibleHbaseConfPath
                                + "/hbase-site.xml to hbase configuration");
                foundHbaseConfiguration = true;
            }
        }
        return foundHbaseConfiguration;
    }

    
    public static byte[] serializeConfiguration(Configuration conf) {
        try {
            return serializeWritable(conf);
        } catch (IOException e) {
            throw new RuntimeException(
                    "Encounter an IOException when serialize the Configuration.", e);
        }
    }

    
    public static Configuration deserializeConfiguration(
            byte[] serializedConfig, Configuration targetConfig) {
        if (null == targetConfig) {
            targetConfig = new Configuration();
        }
        try {
            deserializeWritable(targetConfig, serializedConfig);
        } catch (IOException e) {
            throw new RuntimeException(
                    "Encounter an IOException when deserialize the Configuration.", e);
        }
        return targetConfig;
    }

    
    private static  byte[] serializeWritable(T writable) throws IOException {
        Preconditions.checkArgument(writable != null);

        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
        writable.write(outputStream);
        return byteArrayOutputStream.toByteArray();
    }

    
    private static  void deserializeWritable(T writable, byte[] bytes)
            throws IOException {
        Preconditions.checkArgument(writable != null);
        Preconditions.checkArgument(bytes != null);

        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
        writable.readFields(dataInputStream);
    }

    public static Configuration createHbaseConf() {
        Configuration hbaseClientConf = HbaseConfiguration.create();

        String hbaseConfDir = System.getenv(ENV_Hbase_CONF_DIR);

        if (hbaseConfDir != null) {
            if (new File(hbaseConfDir).exists()) {
                String coreSite = hbaseConfDir + "/core-site.xml";
                String hdfsSite = hbaseConfDir + "/hdfs-site.xml";
                String hbaseSite = hbaseConfDir + "/hbase-site.xml";
                if (new File(coreSite).exists()) {
                    hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(coreSite));
                    LOG.info("Adding " + coreSite + " to hbase configuration");
                }
                if (new File(hdfsSite).exists()) {
                    hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hdfsSite));
                    LOG.info("Adding " + hdfsSite + " to hbase configuration");
                }
                if (new File(hbaseSite).exists()) {
                    hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hbaseSite));
                    LOG.info("Adding " + hbaseSite + " to hbase configuration");
                }
            } else {
                LOG.warn(
                        "Hbase config directory '{}' not found, cannot load Hbase configuration.",
                        hbaseConfDir);
            }
        } else {
            LOG.warn(
                    "{} env variable not found, cannot load Hbase configuration.",
                    ENV_Hbase_CONF_DIR);
        }
        return hbaseClientConf;
    }
}


package connector.hbase.flink14base.util;


import org.apache.flink.table.data.*;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
import static org.apache.flink.util.Preconditions.checkArgument;


public class HbaseSerde {

    private static final byte[] EMPTY_BYTES = new byte[] {};

    private static final int MIN_TIMESTAMP_PRECISION = 0;
    private static final int MAX_TIMESTAMP_PRECISION = 3;
    private static final int MIN_TIME_PRECISION = 0;
    private static final int MAX_TIME_PRECISION = 3;

    private final byte[] nullStringBytes;

    // row key index in output row
    private final int rowkeyIndex;

    // family keys
    private final byte[][] families;
    // qualifier keys
    private final byte[][][] qualifiers;

    private final int fieldLength;

    private GenericRowData reusedRow;
    private GenericRowData[] reusedFamilyRows;

    private final @Nullable FieldEncoder keyEncoder;
    private final @Nullable FieldDecoder keyDecoder;
    private final FieldEncoder[][] qualifierEncoders;
    private final FieldDecoder[][] qualifierDecoders;
    private final GenericRowData rowWithRowKey;

    public HbaseSerde(HbaseTableSchema hbaseSchema, final String nullStringLiteral) {
        this.families = hbaseSchema.getFamilyKeys();
        this.rowkeyIndex = hbaseSchema.getRowKeyIndex();
        LogicalType rowkeyType =
                hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);

        // field length need take row key into account if it exists.
        if (rowkeyIndex != -1 && rowkeyType != null) {
            this.fieldLength = families.length + 1;
            this.keyEncoder = createFieldEncoder(rowkeyType);
            this.keyDecoder = createFieldDecoder(rowkeyType);
        } else {
            this.fieldLength = families.length;
            this.keyEncoder = null;
            this.keyDecoder = null;
        }
        this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);

        // prepare output rows
        this.reusedRow = new GenericRowData(fieldLength);
        this.reusedFamilyRows = new GenericRowData[families.length];

        this.qualifiers = new byte[families.length][][];
        this.qualifierEncoders = new FieldEncoder[families.length][];
        this.qualifierDecoders = new FieldDecoder[families.length][];
        String[] familyNames = hbaseSchema.getFamilyNames();
        for (int f = 0; f < families.length; f++) {
            this.qualifiers[f] = hbaseSchema.getQualifierKeys(familyNames[f]);
            DataType[] dataTypes = hbaseSchema.getQualifierDataTypes(familyNames[f]);
            this.qualifierEncoders[f] =
                    Arrays.stream(dataTypes)
                            .map(DataType::getLogicalType)
                            .map(t -> createNullableFieldEncoder(t, nullStringBytes))
                            .toArray(FieldEncoder[]::new);
            this.qualifierDecoders[f] =
                    Arrays.stream(dataTypes)
                            .map(DataType::getLogicalType)
                            .map(t -> createNullableFieldDecoder(t, nullStringBytes))
                            .toArray(FieldDecoder[]::new);
            this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length);
        }
        this.rowWithRowKey = new GenericRowData(1);
    }

    
    public @Nullable Put createPutMutation(RowData row) {
        checkArgument(keyEncoder != null, "row key is not set.");
        byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
        if (rowkey.length == 0) {
            // drop dirty records, rowkey shouldn't be zero length
            return null;
        }
        // upsert
        Put put = new Put(rowkey);
        for (int i = 0; i < fieldLength; i++) {
            if (i != rowkeyIndex) {
                int f = i > rowkeyIndex ? i - 1 : i;
                // get family key
                byte[] familyKey = families[f];
                RowData familyRow = row.getRow(i, qualifiers[f].length);
                for (int q = 0; q < this.qualifiers[f].length; q++) {
                    // get quantifier key
                    byte[] qualifier = qualifiers[f][q];
                    // serialize value
                    byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
                    put.addColumn(familyKey, qualifier, value);
                }
            }
        }
        return put;
    }

    
    public @Nullable Delete createDeleteMutation(RowData row) {
        checkArgument(keyEncoder != null, "row key is not set.");
        byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
        if (rowkey.length == 0) {
            // drop dirty records, rowkey shouldn't be zero length
            return null;
        }
        // delete
        Delete delete = new Delete(rowkey);
        for (int i = 0; i < fieldLength; i++) {
            if (i != rowkeyIndex) {
                int f = i > rowkeyIndex ? i - 1 : i;
                // get family key
                byte[] familyKey = families[f];
                for (int q = 0; q < this.qualifiers[f].length; q++) {
                    // get quantifier key
                    byte[] qualifier = qualifiers[f][q];
                    delete.addColumn(familyKey, qualifier);
                }
            }
        }
        return delete;
    }

    
    public Scan createScan() {
        Scan scan = new Scan();
        for (int f = 0; f < families.length; f++) {
            byte[] family = families[f];
            for (int q = 0; q < qualifiers[f].length; q++) {
                byte[] quantifier = qualifiers[f][q];
                scan.addColumn(family, quantifier);
            }
        }
        return scan;
    }

    
    public Get createGet(Object rowKey) {
        checkArgument(keyEncoder != null, "row key is not set.");
        rowWithRowKey.setField(0, rowKey);
        byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
        if (rowkey.length == 0) {
            // drop dirty records, rowkey shouldn't be zero length
            return null;
        }
        Get get = new Get(rowkey);
        for (int f = 0; f < families.length; f++) {
            byte[] family = families[f];
            for (byte[] qualifier : qualifiers[f]) {
                get.addColumn(family, qualifier);
            }
        }
        return get;
    }

    
    public RowData convertToNewRow(Result result) {
        // The output rows needs to be initialized each time
        // to prevent the possibility of putting the output object into the cache.
        GenericRowData resultRow = new GenericRowData(fieldLength);
        GenericRowData[] familyRows = new GenericRowData[families.length];
        for (int f = 0; f < families.length; f++) {
            familyRows[f] = new GenericRowData(qualifiers[f].length);
        }
        return convertToRow(result, resultRow, familyRows);
    }

    
    public RowData convertToReusedRow(Result result) {
        return convertToRow(result, reusedRow, reusedFamilyRows);
    }

    private RowData convertToRow(
            Result result, GenericRowData resultRow, GenericRowData[] familyRows) {
        for (int i = 0; i < fieldLength; i++) {
            if (rowkeyIndex == i) {
                assert keyDecoder != null;
                Object rowkey = keyDecoder.decode(result.getRow());
                resultRow.setField(rowkeyIndex, rowkey);
            } else {
                int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
                // get family key
                byte[] familyKey = families[f];
                GenericRowData familyRow = familyRows[f];
                for (int q = 0; q < this.qualifiers[f].length; q++) {
                    // get quantifier key
                    byte[] qualifier = qualifiers[f][q];
                    // read value
                    byte[] value = result.getValue(familyKey, qualifier);
                    familyRow.setField(q, qualifierDecoders[f][q].decode(value));
                }
                resultRow.setField(i, familyRow);
            }
        }
        return resultRow;
    }

    
    @Deprecated
    public RowData convertToRow(Result result) {
        for (int i = 0; i < fieldLength; i++) {
            if (rowkeyIndex == i) {
                assert keyDecoder != null;
                Object rowkey = keyDecoder.decode(result.getRow());
                reusedRow.setField(rowkeyIndex, rowkey);
            } else {
                int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
                // get family key
                byte[] familyKey = families[f];
                GenericRowData familyRow = reusedFamilyRows[f];
                for (int q = 0; q < this.qualifiers[f].length; q++) {
                    // get quantifier key
                    byte[] qualifier = qualifiers[f][q];
                    // read value
                    byte[] value = result.getValue(familyKey, qualifier);
                    familyRow.setField(q, qualifierDecoders[f][q].decode(value));
                }
                reusedRow.setField(i, familyRow);
            }
        }
        return reusedRow;
    }

    // ------------------------------------------------------------------------------------
    // Hbase Runtime Encoders
    // ------------------------------------------------------------------------------------

    
    @FunctionalInterface
    private interface FieldEncoder extends Serializable {
        byte[] encode(RowData row, int pos);
    }

    private static FieldEncoder createNullableFieldEncoder(
            LogicalType fieldType, final byte[] nullStringBytes) {
        final FieldEncoder encoder = createFieldEncoder(fieldType);
        if (fieldType.isNullable()) {
            if (hasFamily(fieldType, LogicalTypeFamily.CHARACTER_STRING)) {
                // special logic for null string values, because Hbase can store empty bytes for
                // string
                return (row, pos) -> {
                    if (row.isNullAt(pos)) {
                        return nullStringBytes;
                    } else {
                        return encoder.encode(row, pos);
                    }
                };
            } else {
                // encode empty bytes for null values
                return (row, pos) -> {
                    if (row.isNullAt(pos)) {
                        return EMPTY_BYTES;
                    } else {
                        return encoder.encode(row, pos);
                    }
                };
            }
        } else {
            return encoder;
        }
    }

    private static FieldEncoder createFieldEncoder(LogicalType fieldType) {
        // ordered by type root definition
        switch (fieldType.getTypeRoot()) {
            case CHAR:
            case VARCHAR:
                // get the underlying UTF-8 bytes
                return (row, pos) -> row.getString(pos).toBytes();
            case BOOLEAN:
                return (row, pos) -> Bytes.toBytes(row.getBoolean(pos));
            case BINARY:
            case VARBINARY:
                return Rowdata::getBinary;
            case DECIMAL:
                return createDecimalEncoder((DecimalType) fieldType);
            case TINYINT:
                return (row, pos) -> new byte[] {row.getByte(pos)};
            case SMALLINT:
                return (row, pos) -> Bytes.toBytes(row.getShort(pos));
            case INTEGER:
            case DATE:
            case INTERVAL_YEAR_MONTH:
                return (row, pos) -> Bytes.toBytes(row.getInt(pos));
            case TIME_WITHOUT_TIME_ZONE:
                final int timePrecision = getPrecision(fieldType);
                if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
                    throw new UnsupportedOperationException(
                            String.format(
                                    "The precision %s of TIME type is out of the range [%s, %s] supported by "
                                            + "Hbase connector",
                                    timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
                }
                return (row, pos) -> Bytes.toBytes(row.getInt(pos));
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return (row, pos) -> Bytes.toBytes(row.getLong(pos));
            case FLOAT:
                return (row, pos) -> Bytes.toBytes(row.getFloat(pos));
            case DOUBLE:
                return (row, pos) -> Bytes.toBytes(row.getDouble(pos));
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                final int timestampPrecision = getPrecision(fieldType);
                if (timestampPrecision < MIN_TIMESTAMP_PRECISION
                        || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
                    throw new UnsupportedOperationException(
                            String.format(
                                    "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
                                            + "Hbase connector",
                                    timestampPrecision,
                                    MIN_TIMESTAMP_PRECISION,
                                    MAX_TIMESTAMP_PRECISION));
                }
                return createTimestampEncoder(timestampPrecision);
            default:
                throw new UnsupportedOperationException("Unsupported type: " + fieldType);
        }
    }

    private static FieldEncoder createDecimalEncoder(DecimalType decimalType) {
        final int precision = decimalType.getPrecision();
        final int scale = decimalType.getScale();
        return (row, pos) -> {
            BigDecimal decimal = row.getDecimal(pos, precision, scale).toBigDecimal();
            return Bytes.toBytes(decimal);
        };
    }

    private static FieldEncoder createTimestampEncoder(final int precision) {
        return (row, pos) -> {
            long millisecond = row.getTimestamp(pos, precision).getMillisecond();
            return Bytes.toBytes(millisecond);
        };
    }

    // ------------------------------------------------------------------------------------
    // Hbase Runtime Decoders
    // ------------------------------------------------------------------------------------

    
    @FunctionalInterface
    private interface FieldDecoder extends Serializable {
        @Nullable
        Object decode(byte[] value);
    }

    private static FieldDecoder createNullableFieldDecoder(
            LogicalType fieldType, final byte[] nullStringBytes) {
        final FieldDecoder decoder = createFieldDecoder(fieldType);
        if (fieldType.isNullable()) {
            if (hasFamily(fieldType, LogicalTypeFamily.CHARACTER_STRING)) {
                return value -> {
                    if (value == null || Arrays.equals(value, nullStringBytes)) {
                        return null;
                    } else {
                        return decoder.decode(value);
                    }
                };
            } else {
                return value -> {
                    if (value == null || value.length == 0) {
                        return null;
                    } else {
                        return decoder.decode(value);
                    }
                };
            }
        } else {
            return decoder;
        }
    }

    private static FieldDecoder createFieldDecoder(LogicalType fieldType) {
        // ordered by type root definition
        switch (fieldType.getTypeRoot()) {
            case CHAR:
            case VARCHAR:
                // reuse bytes
                return Stringdata::fromBytes;
            case BOOLEAN:
                return Bytes::toBoolean;
            case BINARY:
            case VARBINARY:
                return value -> value;
            case DECIMAL:
                return createDecimalDecoder((DecimalType) fieldType);
            case TINYINT:
                return value -> value[0];
            case SMALLINT:
                return Bytes::toShort;
            case INTEGER:
            case DATE:
            case INTERVAL_YEAR_MONTH:
                return Bytes::toInt;
            case TIME_WITHOUT_TIME_ZONE:
                final int timePrecision = getPrecision(fieldType);
                if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
                    throw new UnsupportedOperationException(
                            String.format(
                                    "The precision %s of TIME type is out of the range [%s, %s] supported by "
                                            + "Hbase connector",
                                    timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
                }
                return Bytes::toInt;
            case BIGINT:
            case INTERVAL_DAY_TIME:
                return Bytes::toLong;
            case FLOAT:
                return Bytes::toFloat;
            case DOUBLE:
                return Bytes::toDouble;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                final int timestampPrecision = getPrecision(fieldType);
                if (timestampPrecision < MIN_TIMESTAMP_PRECISION
                        || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
                    throw new UnsupportedOperationException(
                            String.format(
                                    "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
                                            + "Hbase connector",
                                    timestampPrecision,
                                    MIN_TIMESTAMP_PRECISION,
                                    MAX_TIMESTAMP_PRECISION));
                }
                return createTimestampDecoder();
            default:
                throw new UnsupportedOperationException("Unsupported type: " + fieldType);
        }
    }

    private static FieldDecoder createDecimalDecoder(DecimalType decimalType) {
        final int precision = decimalType.getPrecision();
        final int scale = decimalType.getScale();
        return value -> {
            BigDecimal decimal = Bytes.toBigDecimal(value);
            return DecimalData.fromBigDecimal(decimal, precision, scale);
        };
    }

    private static FieldDecoder createTimestampDecoder() {
        return value -> {
            // TODO: support higher precision
            long milliseconds = Bytes.toLong(value);
            return TimestampData.fromEpochMillis(milliseconds);
        };
    }
}


package connector.hbase.flink14base.util;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.linkedHashMap;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;


public class HbaseTableSchema implements Serializable {

    private static final long serialVersionUID = 1L;

    // A Map with key as column family.
    private final Map> familyMap = new linkedHashMap<>();

    // information about rowkey
    private RowKeyInfo rowKeyInfo;

    // charset to parse Hbase keys and strings. UTF-8 by default.
    private String charset = "UTF-8";

    
    public void addColumn(String family, String qualifier, Class clazz) {
        Preconditions.checkNotNull(clazz, "class type");
        DataType type = TypeConversions.fromLegacyInfoToDataType(TypeExtractor.getForClass(clazz));
        addColumn(family, qualifier, type);
    }

    public void addColumn(String family, String qualifier, DataType type) {
        Preconditions.checkNotNull(family, "family name");
        Preconditions.checkNotNull(qualifier, "qualifier name");
        Preconditions.checkNotNull(type, "data type");
        Map qualifierMap = this.familyMap.get(family);

        if (!HbaseTypeUtils.isSupportedType(type.getLogicalType())) {
            // throw exception
            throw new IllegalArgumentException(
                    "Unsupported class type found "
                            + type
                            + ". "
                            + "Better to use byte[].class and deserialize using user defined scalar functions");
        }

        if (qualifierMap == null) {
            qualifierMap = new linkedHashMap<>();
        }
        qualifierMap.put(qualifier, type);
        familyMap.put(family, qualifierMap);
    }

    
    public void setRowKey(String rowKeyName, Class clazz) {
        Preconditions.checkNotNull(clazz, "row key class type");
        DataType type = TypeConversions.fromLegacyInfoToDataType(TypeExtractor.getForClass(clazz));
        setRowKey(rowKeyName, type);
    }

    public void setRowKey(String rowKeyName, DataType type) {
        Preconditions.checkNotNull(rowKeyName, "row key field name");
        Preconditions.checkNotNull(type, "row key data type");
        if (!HbaseTypeUtils.isSupportedType(type.getLogicalType())) {
            // throw exception
            throw new IllegalArgumentException(
                    "Unsupported class type found "
                            + type
                            + ". "
                            + "Better to use byte[].class and deserialize using user defined scalar functions");
        }
        if (rowKeyInfo != null) {
            throw new IllegalArgumentException("Row key can't be set multiple times.");
        }
        this.rowKeyInfo = new RowKeyInfo(rowKeyName, type, familyMap.size());
    }

    
    public void setCharset(String charset) {
        this.charset = charset;
    }

    
    public String[] getFamilyNames() {
        return this.familyMap.keySet().toArray(new String[0]);
    }

    
    public byte[][] getFamilyKeys() {
        Charset c = Charset.forName(charset);

        byte[][] familyKeys = new byte[this.familyMap.size()][];
        int i = 0;
        for (String name : this.familyMap.keySet()) {
            familyKeys[i++] = name.getBytes(c);
        }
        return familyKeys;
    }

    
    public String[] getQualifierNames(String family) {
        Map qualifierMap = familyMap.get(family);

        if (qualifierMap == null) {
            throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
        }

        String[] qualifierNames = new String[qualifierMap.size()];
        int i = 0;
        for (String qualifier : qualifierMap.keySet()) {
            qualifierNames[i] = qualifier;
            i++;
        }
        return qualifierNames;
    }

    
    public byte[][] getQualifierKeys(String family) {
        Map qualifierMap = familyMap.get(family);

        if (qualifierMap == null) {
            throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
        }
        Charset c = Charset.forName(charset);

        byte[][] qualifierKeys = new byte[qualifierMap.size()][];
        int i = 0;
        for (String name : qualifierMap.keySet()) {
            qualifierKeys[i++] = name.getBytes(c);
        }
        return qualifierKeys;
    }

    
    public TypeInformation[] getQualifierTypes(String family) {
        DataType[] dataTypes = getQualifierDataTypes(family);
        return Arrays.stream(dataTypes)
                .map(TypeConversions::fromDataTypeToLegacyInfo)
                .toArray(TypeInformation[]::new);
    }

    public DataType[] getQualifierDataTypes(String family) {
        Map qualifierMap = familyMap.get(family);

        if (qualifierMap == null) {
            throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
        }

        DataType[] dataTypes = new DataType[qualifierMap.size()];
        int i = 0;
        for (DataType dataType : qualifierMap.values()) {
            dataTypes[i] = dataType;
            i++;
        }
        return dataTypes;
    }

    
    private Map getFamilyInfo(String family) {
        return familyMap.get(family);
    }

    
    public String getStringCharset() {
        return this.charset;
    }

    
    public int getRowKeyIndex() {
        return rowKeyInfo == null ? -1 : rowKeyInfo.rowKeyIndex;
    }

    
    public Optional> getRowKeyTypeInfo() {
        return rowKeyInfo == null
                ? Optional.empty()
                : Optional.of(TypeConversions.fromDataTypeToLegacyInfo(rowKeyInfo.rowKeyType));
    }

    public Optional getRowKeyDataType() {
        return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyType);
    }

    
    public Optional getRowKeyName() {
        return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyName);
    }

    
    public HbaseTableSchema getProjectedHbaseTableSchema(int[] projectedFields) {
        if (projectedFields == null) {
            return this;
        }
        HbaseTableSchema newSchema = new HbaseTableSchema();
        String[] fieldNames = convertsToTableSchema().getFieldNames();
        for (int projectedField : projectedFields) {
            String name = fieldNames[projectedField];
            if (rowKeyInfo != null && name.equals(rowKeyInfo.rowKeyName)) {
                newSchema.setRowKey(rowKeyInfo.rowKeyName, rowKeyInfo.rowKeyType);
            } else {
                Map familyInfo = getFamilyInfo(name);
                for (Map.Entry entry : familyInfo.entrySet()) {
                    // create the newSchema
                    String qualifier = entry.getKey();
                    newSchema.addColumn(name, qualifier, entry.getValue());
                }
            }
        }
        newSchema.setCharset(charset);
        return newSchema;
    }

    
    public TableSchema convertsToTableSchema() {
        String[] familyNames = getFamilyNames();
        if (rowKeyInfo != null) {
            String[] fieldNames = new String[familyNames.length + 1];
            DataType[] fieldTypes = new DataType[familyNames.length + 1];
            for (int i = 0; i < fieldNames.length; i++) {
                if (i == rowKeyInfo.rowKeyIndex) {
                    fieldNames[i] = rowKeyInfo.rowKeyName;
                    fieldTypes[i] = rowKeyInfo.rowKeyType;
                } else {
                    int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
                    String family = familyNames[familyIndex];
                    fieldNames[i] = family;
                    fieldTypes[i] =
                            getRowDataType(
                                    getQualifierNames(family), getQualifierDataTypes(family));
                }
            }
            return TableSchema.builder().fields(fieldNames, fieldTypes).build();
        } else {
            String[] fieldNames = new String[familyNames.length];
            DataType[] fieldTypes = new DataType[familyNames.length];
            for (int i = 0; i < fieldNames.length; i++) {
                String family = familyNames[i];
                fieldNames[i] = family;
                fieldTypes[i] =
                        getRowDataType(getQualifierNames(family), getQualifierDataTypes(family));
            }
            return TableSchema.builder().fields(fieldNames, fieldTypes).build();
        }
    }

    
    private static DataType getRowDataType(String[] fieldNames, DataType[] fieldTypes) {
        final DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length];
        for (int j = 0; j < fieldNames.length; j++) {
            fields[j] = DataTypes.FIELD(fieldNames[j], fieldTypes[j]);
        }
        return DataTypes.ROW(fields);
    }

    
    public static HbaseTableSchema fromTableSchema(TableSchema schema) {
        HbaseTableSchema hbaseSchema = new HbaseTableSchema();
        RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
        for (RowType.RowField field : rowType.getFields()) {
            LogicalType fieldType = field.getType();
            if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) {
                RowType familyType = (RowType) fieldType;
                String familyName = field.getName();
                for (RowType.RowField qualifier : familyType.getFields()) {
                    hbaseSchema.addColumn(
                            familyName,
                            qualifier.getName(),
                            fromLogicalToDataType(qualifier.getType()));
                }
            } else if (fieldType.getChildren().size() == 0) {
                hbaseSchema.setRowKey(field.getName(), fromLogicalToDataType(fieldType));
            } else {
                throw new IllegalArgumentException(
                        "Unsupported field type '" + fieldType + "' for Hbase.");
            }
        }
        return hbaseSchema;
    }

    // ------------------------------------------------------------------------------------

    
    private static class RowKeyInfo implements Serializable {
        private static final long serialVersionUID = 1L;
        final String rowKeyName;
        final DataType rowKeyType;
        final int rowKeyIndex;

        RowKeyInfo(String rowKeyName, DataType rowKeyType, int rowKeyIndex) {
            this.rowKeyName = rowKeyName;
            this.rowKeyType = rowKeyType;
            this.rowKeyIndex = rowKeyIndex;
        }
    }
}


package connector.hbase.flink14base.util;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hbase.util.Bytes;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;

import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;


public class HbaseTypeUtils {

    private static final byte[] EMPTY_BYTES = new byte[] {};

    private static final int MIN_TIMESTAMP_PRECISION = 0;
    private static final int MAX_TIMESTAMP_PRECISION = 3;
    private static final int MIN_TIME_PRECISION = 0;
    private static final int MAX_TIME_PRECISION = 3;

    
    public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
        switch (typeIdx) {
            case 0: // byte[]
                return value;
            case 1: // String
                return Arrays.equals(EMPTY_BYTES, value) ? null : new String(value, stringCharset);
            case 2: // byte
                return value[0];
            case 3:
                return Bytes.toShort(value);
            case 4:
                return Bytes.toInt(value);
            case 5:
                return Bytes.toLong(value);
            case 6:
                return Bytes.toFloat(value);
            case 7:
                return Bytes.toDouble(value);
            case 8:
                return Bytes.toBoolean(value);
            case 9: // sql.Timestamp encoded as long
                return new Timestamp(Bytes.toLong(value));
            case 10: // sql.Date encoded as long
                return new Date(Bytes.toLong(value));
            case 11: // sql.Time encoded as long
                return new Time(Bytes.toLong(value));
            case 12:
                return Bytes.toBigDecimal(value);
            case 13:
                return new BigInteger(value);

            default:
                throw new IllegalArgumentException("unsupported type index:" + typeIdx);
        }
    }

    
    public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset) {
        switch (typeIdx) {
            case 0: // byte[]
                return (byte[]) value;
            case 1: // external String
                return value == null ? EMPTY_BYTES : ((String) value).getBytes(stringCharset);
            case 2: // byte
                return value == null ? EMPTY_BYTES : new byte[] {(byte) value};
            case 3:
                return Bytes.toBytes((short) value);
            case 4:
                return Bytes.toBytes((int) value);
            case 5:
                return Bytes.toBytes((long) value);
            case 6:
                return Bytes.toBytes((float) value);
            case 7:
                return Bytes.toBytes((double) value);
            case 8:
                return Bytes.toBytes((boolean) value);
            case 9: // sql.Timestamp encoded to Long
                return Bytes.toBytes(((Timestamp) value).getTime());
            case 10: // sql.Date encoded as long
                return Bytes.toBytes(((Date) value).getTime());
            case 11: // sql.Time encoded as long
                return Bytes.toBytes(((Time) value).getTime());
            case 12:
                return Bytes.toBytes((BigDecimal) value);
            case 13:
                return ((BigInteger) value).toByteArray();

            default:
                throw new IllegalArgumentException("unsupported type index:" + typeIdx);
        }
    }

    
    public static int getTypeIndex(TypeInformation typeInfo) {
        return getTypeIndex(typeInfo.getTypeClass());
    }

    
    public static boolean isSupportedType(Class clazz) {
        return getTypeIndex(clazz) != -1;
    }

    private static int getTypeIndex(Class clazz) {
        if (byte[].class.equals(clazz)) {
            return 0;
        } else if (String.class.equals(clazz)) {
            return 1;
        } else if (Byte.class.equals(clazz)) {
            return 2;
        } else if (Short.class.equals(clazz)) {
            return 3;
        } else if (Integer.class.equals(clazz)) {
            return 4;
        } else if (Long.class.equals(clazz)) {
            return 5;
        } else if (Float.class.equals(clazz)) {
            return 6;
        } else if (Double.class.equals(clazz)) {
            return 7;
        } else if (Boolean.class.equals(clazz)) {
            return 8;
        } else if (Timestamp.class.equals(clazz)) {
            return 9;
        } else if (Date.class.equals(clazz)) {
            return 10;
        } else if (Time.class.equals(clazz)) {
            return 11;
        } else if (BigDecimal.class.equals(clazz)) {
            return 12;
        } else if (BigInteger.class.equals(clazz)) {
            return 13;
        } else {
            return -1;
        }
    }

    
    public static boolean isSupportedType(LogicalType type) {
        // ordered by type root definition
        switch (type.getTypeRoot()) {
            case CHAR:
            case VARCHAR:
            case BOOLEAN:
            case BINARY:
            case VARBINARY:
            case DECIMAL:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case DATE:
            case INTERVAL_YEAR_MONTH:
            case BIGINT:
            case INTERVAL_DAY_TIME:
            case FLOAT:
            case DOUBLE:
                return true;
            case TIME_WITHOUT_TIME_ZONE:
                final int timePrecision = getPrecision(type);
                if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
                    throw new UnsupportedOperationException(
                            String.format(
                                    "The precision %s of TIME type is out of the range [%s, %s] supported by "
                                            + "Hbase connector",
                                    timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
                }
                return true;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                final int timestampPrecision = getPrecision(type);
                if (timestampPrecision < MIN_TIMESTAMP_PRECISION
                        || timestampPrecision > MAX_TIMESTAMP_PRECISION) {
                    throw new UnsupportedOperationException(
                            String.format(
                                    "The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
                                            + "Hbase connector",
                                    timestampPrecision,
                                    MIN_TIMESTAMP_PRECISION,
                                    MAX_TIMESTAMP_PRECISION));
                }
                return true;
            case TIMESTAMP_WITH_TIME_ZONE:
            case ARRAY:
            case MULTISET:
            case MAP:
            case ROW:
            case STRUCTURED_TYPE:
            case DISTINCT_TYPE:
            case RAW:
            case NULL:
            case SYMBOL:
            case UNRESOLVED:
                return false;
            default:
                throw new IllegalArgumentException();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/344624.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号