package connector.hbase.flink14base.sink;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@Internal
public class HbaseSinkFunction extends RichSinkFunction
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase.sink.HbaseSinkFunction.class);
private final String hTableName;
private final byte[] serializedConfig;
private final long bufferFlushMaxSizeInBytes;
private final long bufferFlushMaxMutations;
private final long bufferFlushIntervalMillis;
private final HbaseMutationConverter mutationConverter;
private transient Connection connection;
private transient BufferedMutator mutator;
private transient ScheduledExecutorService executor;
private transient ScheduledFuture scheduledFuture;
private transient AtomicLong numPendingRequests;
private transient volatile boolean closed = false;
private final AtomicReference failureThrowable = new AtomicReference<>();
public HbaseSinkFunction(
String hTableName,
org.apache.hadoop.conf.Configuration conf,
HbaseMutationConverter mutationConverter,
long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations,
long bufferFlushIntervalMillis) {
this.hTableName = hTableName;
// Configuration is not serializable
this.serializedConfig = HbaseConfigurationUtil.serializeConfiguration(conf);
this.mutationConverter = mutationConverter;
this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
this.bufferFlushMaxMutations = bufferFlushMaxMutations;
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
if (null == connection) {
this.connection = ConnectionFactory.createConnection(config);
}
// create a parameter instance, set the table name and custom listener reference.
BufferedMutatorParams params =
new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
if (bufferFlushMaxSizeInBytes > 0) {
params.writeBufferSize(bufferFlushMaxSizeInBytes);
}
this.mutator = connection.getBufferedMutator(params);
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture =
this.executor.scheduleWithFixedDelay(
() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
}
},
bufferFlushIntervalMillis,
bufferFlushIntervalMillis,
TimeUnit.MILLISECONDS);
}
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("Hbase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to Hbase.", ioe);
throw new RuntimeException("Cannot create connection to Hbase.", ioe);
}
LOG.info("end open.");
}
private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
// create default configuration from current runtime env (`hbase-site.xml` in classpath)
// first,
// and overwrite configuration using serialized configuration from client-side env
// (`hbase-site.xml` in classpath).
// user params from client-side have the highest priority
org.apache.hadoop.conf.Configuration runtimeConfig =
HbaseConfigurationUtil.deserializeConfiguration(
serializedConfig, HbaseConfigurationUtil.getHbaseConfiguration());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceonly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
LOG.error(
"Can not connect to Hbase without {} configuration",
HConstants.ZOOKEEPER_QUORUM);
throw new IOException(
"Check Hbase configuration failed, lost: '"
+ HConstants.ZOOKEEPER_QUORUM
+ "'!");
}
return runtimeConfig;
}
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
throw new RuntimeException("An error occurred in HbaseSink.", cause);
}
}
@SuppressWarnings("rawtypes")
@Override
public void invoke(T value, Context context) throws Exception {
checkErrorAndRethrow();
mutator.mutate(mutationConverter.convertToMutation(value));
// flush when the buffer number of mutations greater than the configured max size.
if (bufferFlushMaxMutations > 0
&& numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
}
}
private void flush() throws IOException {
// BufferedMutator is thread-safe
mutator.flush();
numPendingRequests.set(0);
checkErrorAndRethrow();
}
@Override
public void close() throws Exception {
closed = true;
if (mutator != null) {
try {
mutator.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing Hbase BufferedMutator.", e);
}
this.mutator = null;
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing Hbase Connection.", e);
}
this.connection = null;
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
while (numPendingRequests.get() != 0) {
flush();
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// nothing to do.
}
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator)
throws RetriesExhaustedWithDetailsException {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
}
package connector.hbase.flink14base.sink;
import connector.hbase.flink14base.util.HbaseSerde;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.hbase.client.Mutation;
public class RowDataToMutationConverter implements HbaseMutationConverter {
private static final long serialVersionUID = 1L;
private final HbaseTableSchema schema;
private final String nullStringLiteral;
private transient HbaseSerde serde;
public RowDataToMutationConverter(HbaseTableSchema schema, final String nullStringLiteral) {
this.schema = schema;
this.nullStringLiteral = nullStringLiteral;
}
@Override
public void open() {
this.serde = new HbaseSerde(schema, nullStringLiteral);
}
@Override
public Mutation convertToMutation(RowData record) {
RowKind kind = record.getRowKind();
if (kind == RowKind.INSERT || kind == RowKind.UPDATe_AFTER) {
return serde.createPutMutation(record);
} else {
return serde.createDeleteMutation(record);
}
}
}
package connector.hbase.flink14base.source;
import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.conf.Configuration;
import static org.apache.flink.util.Preconditions.checkArgument;
@Internal
public abstract class AbstractHbaseDynamicTableSource
implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
protected final Configuration conf;
protected final String tableName;
protected HbaseTableSchema hbaseSchema;
protected final String nullStringLiteral;
protected final HbaseLookupOptions lookupOptions;
public AbstractHbaseDynamicTableSource(
Configuration conf,
String tableName,
HbaseTableSchema hbaseSchema,
String nullStringLiteral,
HbaseLookupOptions lookupOptions) {
this.conf = conf;
this.tableName = tableName;
this.hbaseSchema = hbaseSchema;
this.nullStringLiteral = nullStringLiteral;
this.lookupOptions = lookupOptions;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return InputFormatProvider.of(getInputFormat());
}
protected abstract InputFormat getInputFormat();
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
checkArgument(
context.getKeys().length == 1 && context.getKeys()[0].length == 1,
"Currently, Hbase table can only be lookup by single rowkey.");
checkArgument(
hbaseSchema.getRowKeyName().isPresent(),
"Hbase schema must have a row key when used in lookup mode.");
checkArgument(
hbaseSchema
.convertsToTableSchema()
.getTableColumn(context.getKeys()[0][0])
.filter(f -> f.getName().equals(hbaseSchema.getRowKeyName().get()))
.isPresent(),
"Currently, Hbase table only supports lookup by rowkey field.");
return TableFunctionProvider.of(
new HbaseRowDataLookupFunction(
conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions));
}
@Override
public boolean supportsNestedProjection() {
// planner doesn't support nested projection push down yet.
return false;
}
@Override
public void applyProjection(int[][] projectedFields) {
TableSchema projectSchema =
TableSchemaUtils.projectSchema(
hbaseSchema.convertsToTableSchema(), projectedFields);
this.hbaseSchema = HbaseTableSchema.fromTableSchema(projectSchema);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertonly();
}
@Override
public String asSummaryString() {
return "Hbase";
}
// -------------------------------------------------------------------------------------------
@VisibleForTesting
public HbaseTableSchema getHbaseTableSchema() {
return this.hbaseSchema;
}
}
package connector.hbase.flink14base.source;
import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import connector.hbase.flink14base.util.HbaseSerde;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Internal
public class HbaseRowDataLookupFunction extends TableFunction {
private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase.source.HbaseRowDataLookupFunction.class);
private static final long serialVersionUID = 1L;
private final String hTableName;
private final byte[] serializedConfig;
private final HbaseTableSchema hbaseTableSchema;
private final String nullStringLiteral;
private transient Connection hConnection;
private transient HTable table;
private transient HbaseSerde serde;
private final long cacheMaxSize;
private final long cacheExpireMs;
private final int maxRetryTimes;
private transient Cache
package connector.hbase.flink14base.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.LocatableInputSplit;
@Internal
public class TableInputSplit extends LocatableInputSplit {
private static final long serialVersionUID = 1L;
private final byte[] tableName;
private final byte[] startRow;
private final byte[] endRow;
public TableInputSplit(
final int splitNumber,
final String[] hostnames,
final byte[] tableName,
final byte[] startRow,
final byte[] endRow) {
super(splitNumber, hostnames);
this.tableName = tableName;
this.startRow = startRow;
this.endRow = endRow;
}
public byte[] getTableName() {
return this.tableName;
}
public byte[] getStartRow() {
return this.startRow;
}
public byte[] getEndRow() {
return this.endRow;
}
}
package connector.hbase.flink14base.table;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
@PublicEvolving
public class HbaseConnectorOptions {
public static final ConfigOption TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("The name of Hbase table to connect.");
public static final ConfigOption ZOOKEEPER_QUORUM =
ConfigOptions.key("zookeeper.quorum")
.stringType()
.noDefaultValue()
.withDescription("The Hbase Zookeeper quorum.");
public static final ConfigOption ZOOKEEPER_ZNODE_PARENT =
ConfigOptions.key("zookeeper.znode.parent")
.stringType()
.defaultValue("/hbase")
.withDescription("The root dir in Zookeeper for Hbase cluster.");
public static final ConfigOption NULL_STRING_LITERAL =
ConfigOptions.key("null-string-literal")
.stringType()
.defaultValue("null")
.withDescription(
"Representation for null values for string fields. Hbase source and "
+ "sink encodes/decodes empty bytes as null values for all types except string type.");
public static final ConfigOption SINK_BUFFER_FLUSH_MAX_SIZE =
ConfigOptions.key("sink.buffer-flush.max-size")
.memoryType()
.defaultValue(MemorySize.parse("2mb"))
.withDescription(
"Writing option, maximum size in memory of buffered rows for each "
+ "writing request. This can improve performance for writing data to Hbase database, "
+ "but may increase the latency. Can be set to '0' to disable it. ");
public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
.defaultValue(1000)
.withDescription(
"Writing option, maximum number of rows to buffer for each writing request. "
+ "This can improve performance for writing data to Hbase database, but may increase the latency. "
+ "Can be set to '0' to disable it.");
public static final ConfigOption SINK_BUFFER_FLUSH_INTERVAL =
ConfigOptions.key("sink.buffer-flush.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1))
.withDescription(
"Writing option, the interval to flush any buffered rows. "
+ "This can improve performance for writing data to Hbase database, but may increase the latency. "
+ "Can be set to '0' to disable it. Note, both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush interval set allowing for complete async processing of buffered actions.");
public static final ConfigOption LOOKUP_ASYNC =
ConfigOptions.key("lookup.async")
.booleanType()
.defaultValue(false)
.withDescription("whether to set async lookup.");
public static final ConfigOption LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows")
.longType()
.defaultValue(-1L)
.withDescription(
"the max number of rows of lookup cache, over this value, the oldest rows will "
+ "be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any of them is "
+ "specified. Cache is not enabled as default.");
public static final ConfigOption LOOKUP_CACHE_TTL =
ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(0))
.withDescription("the cache time to live.");
public static final ConfigOption LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
.defaultValue(3)
.withDescription("the max retry times if lookup database failed.");
public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
private HbaseConnectorOptions() {}
}
package connector.hbase.flink14base.table;
import connector.hbase.flink14base.options.HbaseLookupOptions;
import connector.hbase.flink14base.options.HbaseWriteOptions;
import connector.hbase.flink14base.util.HbaseConfigurationUtil;
import connector.hbase.flink14base.util.HbaseTableSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import java.util.Map;
import java.util.Properties;
import static connector.hbase.flink14base.table.HbaseConnectorOptions.*;
public class HbaseConnectorOptionsUtil {
public static final String PROPERTIES_PREFIX = "properties.";
// --------------------------------------------------------------------------------------------
// Validation
// --------------------------------------------------------------------------------------------
public static void validatePrimaryKey(TableSchema schema) {
HbaseTableSchema hbaseSchema = HbaseTableSchema.fromTableSchema(schema);
if (!hbaseSchema.getRowKeyName().isPresent()) {
throw new IllegalArgumentException(
"Hbase table requires to define a row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
schema.getPrimaryKey()
.ifPresent(
k -> {
if (k.getColumns().size() > 1) {
throw new IllegalArgumentException(
"Hbase table doesn't support a primary Key on multiple columns. "
+ "The primary key of Hbase table must be defined on row key field.");
}
if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
throw new IllegalArgumentException(
"Primary key of Hbase table must be defined on the row key field. "
+ "A row key field is defined as an atomic type, "
+ "column families and qualifiers are defined as ROW type.");
}
});
}
public static HbaseWriteOptions getHbaseWriteOptions(ReadableConfig tableOptions) {
HbaseWriteOptions.Builder builder = HbaseWriteOptions.builder();
builder.setBufferFlushIntervalMillis(
tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
builder.setBufferFlushMaxRows(tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxSizeInBytes(
tableOptions.get(SINK_BUFFER_FLUSH_MAX_SIZE).getBytes());
builder.setParallelism(tableOptions.getOptional(SINK_PARALLELISM).orElse(null));
return builder.build();
}
public static HbaseLookupOptions getHbaseLookupOptions(ReadableConfig tableOptions) {
HbaseLookupOptions.Builder builder = HbaseLookupOptions.builder();
builder.setLookupAsync(tableOptions.get(HbaseConnectorOptions.LOOKUP_ASYNC));
builder.setMaxRetryTimes(tableOptions.get(HbaseConnectorOptions.LOOKUP_MAX_RETRIES));
builder.setCacheExpireMs(
tableOptions.get(HbaseConnectorOptions.LOOKUP_CACHE_TTL).toMillis());
builder.setCacheMaxSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS));
return builder.build();
}
public static Configuration getHbaseConfiguration(Map options) {
org.apache.flink.configuration.Configuration tableOptions =
org.apache.flink.configuration.Configuration.fromMap(options);
// create default configuration from current runtime env (`hbase-site.xml` in classpath)
// first,
Configuration hbaseClientConf = HbaseConfigurationUtil.getHbaseConfiguration();
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.getString(ZOOKEEPER_QUORUM));
hbaseClientConf.set(
HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.getString(ZOOKEEPER_ZNODE_PARENT));
// add Hbase properties
final Properties properties = getHbaseClientProperties(options);
properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), v.toString()));
return hbaseClientConf;
}
private static Properties getHbaseClientProperties(Map tableOptions) {
final Properties hbaseProperties = new Properties();
if (containsHbaseClientProperties(tableOptions)) {
tableOptions.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(
key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
hbaseProperties.put(subKey, value);
});
}
return hbaseProperties;
}
private static boolean containsHbaseClientProperties(Map tableOptions) {
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
private HbaseConnectorOptionsUtil() {}
}
package connector.hbase.flink14base.util;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
public class HbaseConfigurationUtil {
private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.hbase.util.HbaseConfigurationUtil.class);
public static final String ENV_Hbase_CONF_DIR = "Hbase_CONF_DIR";
public static Configuration getHbaseConfiguration() {
// Instantiate an HbaseConfiguration to load the hbase-default.xml and hbase-site.xml from
// the classpath.
Configuration result = HbaseConfiguration.create();
boolean foundHbaseConfiguration = false;
// We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: Hbase_HOME environment variables
String possibleHbaseConfPath = null;
final String hbaseHome = System.getenv("Hbase_HOME");
if (hbaseHome != null) {
LOG.debug("Searching Hbase configuration files in Hbase_HOME: {}", hbaseHome);
possibleHbaseConfPath = hbaseHome + "/conf";
}
if (possibleHbaseConfPath != null) {
foundHbaseConfiguration = addHbaseConfIfFound(result, possibleHbaseConfPath);
}
// Approach 2: Hbase_CONF_DIR environment variable
String hbaseConfDir = System.getenv("Hbase_CONF_DIR");
if (hbaseConfDir != null) {
LOG.debug("Searching Hbase configuration files in Hbase_CONF_DIR: {}", hbaseConfDir);
foundHbaseConfiguration =
addHbaseConfIfFound(result, hbaseConfDir) || foundHbaseConfiguration;
}
if (!foundHbaseConfiguration) {
LOG.warn(
"Could not find Hbase configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
}
return result;
}
private static boolean addHbaseConfIfFound(
Configuration configuration, String possibleHbaseConfPath) {
boolean foundHbaseConfiguration = false;
if (new File(possibleHbaseConfPath).exists()) {
if (new File(possibleHbaseConfPath + "/hbase-default.xml").exists()) {
configuration.addResource(
new org.apache.hadoop.fs.Path(
possibleHbaseConfPath + "/hbase-default.xml"));
LOG.debug(
"Adding "
+ possibleHbaseConfPath
+ "/hbase-default.xml to hbase configuration");
foundHbaseConfiguration = true;
}
if (new File(possibleHbaseConfPath + "/hbase-site.xml").exists()) {
configuration.addResource(
new org.apache.hadoop.fs.Path(possibleHbaseConfPath + "/hbase-site.xml"));
LOG.debug(
"Adding "
+ possibleHbaseConfPath
+ "/hbase-site.xml to hbase configuration");
foundHbaseConfiguration = true;
}
}
return foundHbaseConfiguration;
}
public static byte[] serializeConfiguration(Configuration conf) {
try {
return serializeWritable(conf);
} catch (IOException e) {
throw new RuntimeException(
"Encounter an IOException when serialize the Configuration.", e);
}
}
public static Configuration deserializeConfiguration(
byte[] serializedConfig, Configuration targetConfig) {
if (null == targetConfig) {
targetConfig = new Configuration();
}
try {
deserializeWritable(targetConfig, serializedConfig);
} catch (IOException e) {
throw new RuntimeException(
"Encounter an IOException when deserialize the Configuration.", e);
}
return targetConfig;
}
private static byte[] serializeWritable(T writable) throws IOException {
Preconditions.checkArgument(writable != null);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
writable.write(outputStream);
return byteArrayOutputStream.toByteArray();
}
private static void deserializeWritable(T writable, byte[] bytes)
throws IOException {
Preconditions.checkArgument(writable != null);
Preconditions.checkArgument(bytes != null);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
writable.readFields(dataInputStream);
}
public static Configuration createHbaseConf() {
Configuration hbaseClientConf = HbaseConfiguration.create();
String hbaseConfDir = System.getenv(ENV_Hbase_CONF_DIR);
if (hbaseConfDir != null) {
if (new File(hbaseConfDir).exists()) {
String coreSite = hbaseConfDir + "/core-site.xml";
String hdfsSite = hbaseConfDir + "/hdfs-site.xml";
String hbaseSite = hbaseConfDir + "/hbase-site.xml";
if (new File(coreSite).exists()) {
hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(coreSite));
LOG.info("Adding " + coreSite + " to hbase configuration");
}
if (new File(hdfsSite).exists()) {
hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hdfsSite));
LOG.info("Adding " + hdfsSite + " to hbase configuration");
}
if (new File(hbaseSite).exists()) {
hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hbaseSite));
LOG.info("Adding " + hbaseSite + " to hbase configuration");
}
} else {
LOG.warn(
"Hbase config directory '{}' not found, cannot load Hbase configuration.",
hbaseConfDir);
}
} else {
LOG.warn(
"{} env variable not found, cannot load Hbase configuration.",
ENV_Hbase_CONF_DIR);
}
return hbaseClientConf;
}
}
package connector.hbase.flink14base.util;
import org.apache.flink.table.data.*;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
import static org.apache.flink.util.Preconditions.checkArgument;
public class HbaseSerde {
private static final byte[] EMPTY_BYTES = new byte[] {};
private static final int MIN_TIMESTAMP_PRECISION = 0;
private static final int MAX_TIMESTAMP_PRECISION = 3;
private static final int MIN_TIME_PRECISION = 0;
private static final int MAX_TIME_PRECISION = 3;
private final byte[] nullStringBytes;
// row key index in output row
private final int rowkeyIndex;
// family keys
private final byte[][] families;
// qualifier keys
private final byte[][][] qualifiers;
private final int fieldLength;
private GenericRowData reusedRow;
private GenericRowData[] reusedFamilyRows;
private final @Nullable FieldEncoder keyEncoder;
private final @Nullable FieldDecoder keyDecoder;
private final FieldEncoder[][] qualifierEncoders;
private final FieldDecoder[][] qualifierDecoders;
private final GenericRowData rowWithRowKey;
public HbaseSerde(HbaseTableSchema hbaseSchema, final String nullStringLiteral) {
this.families = hbaseSchema.getFamilyKeys();
this.rowkeyIndex = hbaseSchema.getRowKeyIndex();
LogicalType rowkeyType =
hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
// field length need take row key into account if it exists.
if (rowkeyIndex != -1 && rowkeyType != null) {
this.fieldLength = families.length + 1;
this.keyEncoder = createFieldEncoder(rowkeyType);
this.keyDecoder = createFieldDecoder(rowkeyType);
} else {
this.fieldLength = families.length;
this.keyEncoder = null;
this.keyDecoder = null;
}
this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
// prepare output rows
this.reusedRow = new GenericRowData(fieldLength);
this.reusedFamilyRows = new GenericRowData[families.length];
this.qualifiers = new byte[families.length][][];
this.qualifierEncoders = new FieldEncoder[families.length][];
this.qualifierDecoders = new FieldDecoder[families.length][];
String[] familyNames = hbaseSchema.getFamilyNames();
for (int f = 0; f < families.length; f++) {
this.qualifiers[f] = hbaseSchema.getQualifierKeys(familyNames[f]);
DataType[] dataTypes = hbaseSchema.getQualifierDataTypes(familyNames[f]);
this.qualifierEncoders[f] =
Arrays.stream(dataTypes)
.map(DataType::getLogicalType)
.map(t -> createNullableFieldEncoder(t, nullStringBytes))
.toArray(FieldEncoder[]::new);
this.qualifierDecoders[f] =
Arrays.stream(dataTypes)
.map(DataType::getLogicalType)
.map(t -> createNullableFieldDecoder(t, nullStringBytes))
.toArray(FieldDecoder[]::new);
this.reusedFamilyRows[f] = new GenericRowData(dataTypes.length);
}
this.rowWithRowKey = new GenericRowData(1);
}
public @Nullable Put createPutMutation(RowData row) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
// upsert
Put put = new Put(rowkey);
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
RowData familyRow = row.getRow(i, qualifiers[f].length);
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// serialize value
byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
put.addColumn(familyKey, qualifier, value);
}
}
}
return put;
}
public @Nullable Delete createDeleteMutation(RowData row) {
checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
// delete
Delete delete = new Delete(rowkey);
for (int i = 0; i < fieldLength; i++) {
if (i != rowkeyIndex) {
int f = i > rowkeyIndex ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
delete.addColumn(familyKey, qualifier);
}
}
}
return delete;
}
public Scan createScan() {
Scan scan = new Scan();
for (int f = 0; f < families.length; f++) {
byte[] family = families[f];
for (int q = 0; q < qualifiers[f].length; q++) {
byte[] quantifier = qualifiers[f][q];
scan.addColumn(family, quantifier);
}
}
return scan;
}
public Get createGet(Object rowKey) {
checkArgument(keyEncoder != null, "row key is not set.");
rowWithRowKey.setField(0, rowKey);
byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
Get get = new Get(rowkey);
for (int f = 0; f < families.length; f++) {
byte[] family = families[f];
for (byte[] qualifier : qualifiers[f]) {
get.addColumn(family, qualifier);
}
}
return get;
}
public RowData convertToNewRow(Result result) {
// The output rows needs to be initialized each time
// to prevent the possibility of putting the output object into the cache.
GenericRowData resultRow = new GenericRowData(fieldLength);
GenericRowData[] familyRows = new GenericRowData[families.length];
for (int f = 0; f < families.length; f++) {
familyRows[f] = new GenericRowData(qualifiers[f].length);
}
return convertToRow(result, resultRow, familyRows);
}
public RowData convertToReusedRow(Result result) {
return convertToRow(result, reusedRow, reusedFamilyRows);
}
private RowData convertToRow(
Result result, GenericRowData resultRow, GenericRowData[] familyRows) {
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
assert keyDecoder != null;
Object rowkey = keyDecoder.decode(result.getRow());
resultRow.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
GenericRowData familyRow = familyRows[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// read value
byte[] value = result.getValue(familyKey, qualifier);
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
resultRow.setField(i, familyRow);
}
}
return resultRow;
}
@Deprecated
public RowData convertToRow(Result result) {
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
assert keyDecoder != null;
Object rowkey = keyDecoder.decode(result.getRow());
reusedRow.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
byte[] familyKey = families[f];
GenericRowData familyRow = reusedFamilyRows[f];
for (int q = 0; q < this.qualifiers[f].length; q++) {
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// read value
byte[] value = result.getValue(familyKey, qualifier);
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
reusedRow.setField(i, familyRow);
}
}
return reusedRow;
}
// ------------------------------------------------------------------------------------
// Hbase Runtime Encoders
// ------------------------------------------------------------------------------------
@FunctionalInterface
private interface FieldEncoder extends Serializable {
byte[] encode(RowData row, int pos);
}
private static FieldEncoder createNullableFieldEncoder(
LogicalType fieldType, final byte[] nullStringBytes) {
final FieldEncoder encoder = createFieldEncoder(fieldType);
if (fieldType.isNullable()) {
if (hasFamily(fieldType, LogicalTypeFamily.CHARACTER_STRING)) {
// special logic for null string values, because Hbase can store empty bytes for
// string
return (row, pos) -> {
if (row.isNullAt(pos)) {
return nullStringBytes;
} else {
return encoder.encode(row, pos);
}
};
} else {
// encode empty bytes for null values
return (row, pos) -> {
if (row.isNullAt(pos)) {
return EMPTY_BYTES;
} else {
return encoder.encode(row, pos);
}
};
}
} else {
return encoder;
}
}
private static FieldEncoder createFieldEncoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
case VARCHAR:
// get the underlying UTF-8 bytes
return (row, pos) -> row.getString(pos).toBytes();
case BOOLEAN:
return (row, pos) -> Bytes.toBytes(row.getBoolean(pos));
case BINARY:
case VARBINARY:
return Rowdata::getBinary;
case DECIMAL:
return createDecimalEncoder((DecimalType) fieldType);
case TINYINT:
return (row, pos) -> new byte[] {row.getByte(pos)};
case SMALLINT:
return (row, pos) -> Bytes.toBytes(row.getShort(pos));
case INTEGER:
case DATE:
case INTERVAL_YEAR_MONTH:
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIME type is out of the range [%s, %s] supported by "
+ "Hbase connector",
timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
}
return (row, pos) -> Bytes.toBytes(row.getInt(pos));
case BIGINT:
case INTERVAL_DAY_TIME:
return (row, pos) -> Bytes.toBytes(row.getLong(pos));
case FLOAT:
return (row, pos) -> Bytes.toBytes(row.getFloat(pos));
case DOUBLE:
return (row, pos) -> Bytes.toBytes(row.getDouble(pos));
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
if (timestampPrecision < MIN_TIMESTAMP_PRECISION
|| timestampPrecision > MAX_TIMESTAMP_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
+ "Hbase connector",
timestampPrecision,
MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
}
return createTimestampEncoder(timestampPrecision);
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}
private static FieldEncoder createDecimalEncoder(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return (row, pos) -> {
BigDecimal decimal = row.getDecimal(pos, precision, scale).toBigDecimal();
return Bytes.toBytes(decimal);
};
}
private static FieldEncoder createTimestampEncoder(final int precision) {
return (row, pos) -> {
long millisecond = row.getTimestamp(pos, precision).getMillisecond();
return Bytes.toBytes(millisecond);
};
}
// ------------------------------------------------------------------------------------
// Hbase Runtime Decoders
// ------------------------------------------------------------------------------------
@FunctionalInterface
private interface FieldDecoder extends Serializable {
@Nullable
Object decode(byte[] value);
}
private static FieldDecoder createNullableFieldDecoder(
LogicalType fieldType, final byte[] nullStringBytes) {
final FieldDecoder decoder = createFieldDecoder(fieldType);
if (fieldType.isNullable()) {
if (hasFamily(fieldType, LogicalTypeFamily.CHARACTER_STRING)) {
return value -> {
if (value == null || Arrays.equals(value, nullStringBytes)) {
return null;
} else {
return decoder.decode(value);
}
};
} else {
return value -> {
if (value == null || value.length == 0) {
return null;
} else {
return decoder.decode(value);
}
};
}
} else {
return decoder;
}
}
private static FieldDecoder createFieldDecoder(LogicalType fieldType) {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case CHAR:
case VARCHAR:
// reuse bytes
return Stringdata::fromBytes;
case BOOLEAN:
return Bytes::toBoolean;
case BINARY:
case VARBINARY:
return value -> value;
case DECIMAL:
return createDecimalDecoder((DecimalType) fieldType);
case TINYINT:
return value -> value[0];
case SMALLINT:
return Bytes::toShort;
case INTEGER:
case DATE:
case INTERVAL_YEAR_MONTH:
return Bytes::toInt;
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(fieldType);
if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIME type is out of the range [%s, %s] supported by "
+ "Hbase connector",
timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
}
return Bytes::toInt;
case BIGINT:
case INTERVAL_DAY_TIME:
return Bytes::toLong;
case FLOAT:
return Bytes::toFloat;
case DOUBLE:
return Bytes::toDouble;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(fieldType);
if (timestampPrecision < MIN_TIMESTAMP_PRECISION
|| timestampPrecision > MAX_TIMESTAMP_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
+ "Hbase connector",
timestampPrecision,
MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
}
return createTimestampDecoder();
default:
throw new UnsupportedOperationException("Unsupported type: " + fieldType);
}
}
private static FieldDecoder createDecimalDecoder(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return value -> {
BigDecimal decimal = Bytes.toBigDecimal(value);
return DecimalData.fromBigDecimal(decimal, precision, scale);
};
}
private static FieldDecoder createTimestampDecoder() {
return value -> {
// TODO: support higher precision
long milliseconds = Bytes.toLong(value);
return TimestampData.fromEpochMillis(milliseconds);
};
}
}
package connector.hbase.flink14base.util;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.linkedHashMap;
import java.util.Map;
import java.util.Optional;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
public class HbaseTableSchema implements Serializable {
private static final long serialVersionUID = 1L;
// A Map with key as column family.
private final Map> familyMap = new linkedHashMap<>();
// information about rowkey
private RowKeyInfo rowKeyInfo;
// charset to parse Hbase keys and strings. UTF-8 by default.
private String charset = "UTF-8";
public void addColumn(String family, String qualifier, Class> clazz) {
Preconditions.checkNotNull(clazz, "class type");
DataType type = TypeConversions.fromLegacyInfoToDataType(TypeExtractor.getForClass(clazz));
addColumn(family, qualifier, type);
}
public void addColumn(String family, String qualifier, DataType type) {
Preconditions.checkNotNull(family, "family name");
Preconditions.checkNotNull(qualifier, "qualifier name");
Preconditions.checkNotNull(type, "data type");
Map qualifierMap = this.familyMap.get(family);
if (!HbaseTypeUtils.isSupportedType(type.getLogicalType())) {
// throw exception
throw new IllegalArgumentException(
"Unsupported class type found "
+ type
+ ". "
+ "Better to use byte[].class and deserialize using user defined scalar functions");
}
if (qualifierMap == null) {
qualifierMap = new linkedHashMap<>();
}
qualifierMap.put(qualifier, type);
familyMap.put(family, qualifierMap);
}
public void setRowKey(String rowKeyName, Class> clazz) {
Preconditions.checkNotNull(clazz, "row key class type");
DataType type = TypeConversions.fromLegacyInfoToDataType(TypeExtractor.getForClass(clazz));
setRowKey(rowKeyName, type);
}
public void setRowKey(String rowKeyName, DataType type) {
Preconditions.checkNotNull(rowKeyName, "row key field name");
Preconditions.checkNotNull(type, "row key data type");
if (!HbaseTypeUtils.isSupportedType(type.getLogicalType())) {
// throw exception
throw new IllegalArgumentException(
"Unsupported class type found "
+ type
+ ". "
+ "Better to use byte[].class and deserialize using user defined scalar functions");
}
if (rowKeyInfo != null) {
throw new IllegalArgumentException("Row key can't be set multiple times.");
}
this.rowKeyInfo = new RowKeyInfo(rowKeyName, type, familyMap.size());
}
public void setCharset(String charset) {
this.charset = charset;
}
public String[] getFamilyNames() {
return this.familyMap.keySet().toArray(new String[0]);
}
public byte[][] getFamilyKeys() {
Charset c = Charset.forName(charset);
byte[][] familyKeys = new byte[this.familyMap.size()][];
int i = 0;
for (String name : this.familyMap.keySet()) {
familyKeys[i++] = name.getBytes(c);
}
return familyKeys;
}
public String[] getQualifierNames(String family) {
Map qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
}
String[] qualifierNames = new String[qualifierMap.size()];
int i = 0;
for (String qualifier : qualifierMap.keySet()) {
qualifierNames[i] = qualifier;
i++;
}
return qualifierNames;
}
public byte[][] getQualifierKeys(String family) {
Map qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
}
Charset c = Charset.forName(charset);
byte[][] qualifierKeys = new byte[qualifierMap.size()][];
int i = 0;
for (String name : qualifierMap.keySet()) {
qualifierKeys[i++] = name.getBytes(c);
}
return qualifierKeys;
}
public TypeInformation>[] getQualifierTypes(String family) {
DataType[] dataTypes = getQualifierDataTypes(family);
return Arrays.stream(dataTypes)
.map(TypeConversions::fromDataTypeToLegacyInfo)
.toArray(TypeInformation[]::new);
}
public DataType[] getQualifierDataTypes(String family) {
Map qualifierMap = familyMap.get(family);
if (qualifierMap == null) {
throw new IllegalArgumentException("Family " + family + " does not exist in schema.");
}
DataType[] dataTypes = new DataType[qualifierMap.size()];
int i = 0;
for (DataType dataType : qualifierMap.values()) {
dataTypes[i] = dataType;
i++;
}
return dataTypes;
}
private Map getFamilyInfo(String family) {
return familyMap.get(family);
}
public String getStringCharset() {
return this.charset;
}
public int getRowKeyIndex() {
return rowKeyInfo == null ? -1 : rowKeyInfo.rowKeyIndex;
}
public Optional> getRowKeyTypeInfo() {
return rowKeyInfo == null
? Optional.empty()
: Optional.of(TypeConversions.fromDataTypeToLegacyInfo(rowKeyInfo.rowKeyType));
}
public Optional getRowKeyDataType() {
return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyType);
}
public Optional getRowKeyName() {
return rowKeyInfo == null ? Optional.empty() : Optional.of(rowKeyInfo.rowKeyName);
}
public HbaseTableSchema getProjectedHbaseTableSchema(int[] projectedFields) {
if (projectedFields == null) {
return this;
}
HbaseTableSchema newSchema = new HbaseTableSchema();
String[] fieldNames = convertsToTableSchema().getFieldNames();
for (int projectedField : projectedFields) {
String name = fieldNames[projectedField];
if (rowKeyInfo != null && name.equals(rowKeyInfo.rowKeyName)) {
newSchema.setRowKey(rowKeyInfo.rowKeyName, rowKeyInfo.rowKeyType);
} else {
Map familyInfo = getFamilyInfo(name);
for (Map.Entry entry : familyInfo.entrySet()) {
// create the newSchema
String qualifier = entry.getKey();
newSchema.addColumn(name, qualifier, entry.getValue());
}
}
}
newSchema.setCharset(charset);
return newSchema;
}
public TableSchema convertsToTableSchema() {
String[] familyNames = getFamilyNames();
if (rowKeyInfo != null) {
String[] fieldNames = new String[familyNames.length + 1];
DataType[] fieldTypes = new DataType[familyNames.length + 1];
for (int i = 0; i < fieldNames.length; i++) {
if (i == rowKeyInfo.rowKeyIndex) {
fieldNames[i] = rowKeyInfo.rowKeyName;
fieldTypes[i] = rowKeyInfo.rowKeyType;
} else {
int familyIndex = i < rowKeyInfo.rowKeyIndex ? i : i - 1;
String family = familyNames[familyIndex];
fieldNames[i] = family;
fieldTypes[i] =
getRowDataType(
getQualifierNames(family), getQualifierDataTypes(family));
}
}
return TableSchema.builder().fields(fieldNames, fieldTypes).build();
} else {
String[] fieldNames = new String[familyNames.length];
DataType[] fieldTypes = new DataType[familyNames.length];
for (int i = 0; i < fieldNames.length; i++) {
String family = familyNames[i];
fieldNames[i] = family;
fieldTypes[i] =
getRowDataType(getQualifierNames(family), getQualifierDataTypes(family));
}
return TableSchema.builder().fields(fieldNames, fieldTypes).build();
}
}
private static DataType getRowDataType(String[] fieldNames, DataType[] fieldTypes) {
final DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length];
for (int j = 0; j < fieldNames.length; j++) {
fields[j] = DataTypes.FIELD(fieldNames[j], fieldTypes[j]);
}
return DataTypes.ROW(fields);
}
public static HbaseTableSchema fromTableSchema(TableSchema schema) {
HbaseTableSchema hbaseSchema = new HbaseTableSchema();
RowType rowType = (RowType) schema.toPhysicalRowDataType().getLogicalType();
for (RowType.RowField field : rowType.getFields()) {
LogicalType fieldType = field.getType();
if (fieldType.getTypeRoot() == LogicalTypeRoot.ROW) {
RowType familyType = (RowType) fieldType;
String familyName = field.getName();
for (RowType.RowField qualifier : familyType.getFields()) {
hbaseSchema.addColumn(
familyName,
qualifier.getName(),
fromLogicalToDataType(qualifier.getType()));
}
} else if (fieldType.getChildren().size() == 0) {
hbaseSchema.setRowKey(field.getName(), fromLogicalToDataType(fieldType));
} else {
throw new IllegalArgumentException(
"Unsupported field type '" + fieldType + "' for Hbase.");
}
}
return hbaseSchema;
}
// ------------------------------------------------------------------------------------
private static class RowKeyInfo implements Serializable {
private static final long serialVersionUID = 1L;
final String rowKeyName;
final DataType rowKeyType;
final int rowKeyIndex;
RowKeyInfo(String rowKeyName, DataType rowKeyType, int rowKeyIndex) {
this.rowKeyName = rowKeyName;
this.rowKeyType = rowKeyType;
this.rowKeyIndex = rowKeyIndex;
}
}
}
package connector.hbase.flink14base.util;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hbase.util.Bytes;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
public class HbaseTypeUtils {
private static final byte[] EMPTY_BYTES = new byte[] {};
private static final int MIN_TIMESTAMP_PRECISION = 0;
private static final int MAX_TIMESTAMP_PRECISION = 3;
private static final int MIN_TIME_PRECISION = 0;
private static final int MAX_TIME_PRECISION = 3;
public static Object deserializeToObject(byte[] value, int typeIdx, Charset stringCharset) {
switch (typeIdx) {
case 0: // byte[]
return value;
case 1: // String
return Arrays.equals(EMPTY_BYTES, value) ? null : new String(value, stringCharset);
case 2: // byte
return value[0];
case 3:
return Bytes.toShort(value);
case 4:
return Bytes.toInt(value);
case 5:
return Bytes.toLong(value);
case 6:
return Bytes.toFloat(value);
case 7:
return Bytes.toDouble(value);
case 8:
return Bytes.toBoolean(value);
case 9: // sql.Timestamp encoded as long
return new Timestamp(Bytes.toLong(value));
case 10: // sql.Date encoded as long
return new Date(Bytes.toLong(value));
case 11: // sql.Time encoded as long
return new Time(Bytes.toLong(value));
case 12:
return Bytes.toBigDecimal(value);
case 13:
return new BigInteger(value);
default:
throw new IllegalArgumentException("unsupported type index:" + typeIdx);
}
}
public static byte[] serializeFromObject(Object value, int typeIdx, Charset stringCharset) {
switch (typeIdx) {
case 0: // byte[]
return (byte[]) value;
case 1: // external String
return value == null ? EMPTY_BYTES : ((String) value).getBytes(stringCharset);
case 2: // byte
return value == null ? EMPTY_BYTES : new byte[] {(byte) value};
case 3:
return Bytes.toBytes((short) value);
case 4:
return Bytes.toBytes((int) value);
case 5:
return Bytes.toBytes((long) value);
case 6:
return Bytes.toBytes((float) value);
case 7:
return Bytes.toBytes((double) value);
case 8:
return Bytes.toBytes((boolean) value);
case 9: // sql.Timestamp encoded to Long
return Bytes.toBytes(((Timestamp) value).getTime());
case 10: // sql.Date encoded as long
return Bytes.toBytes(((Date) value).getTime());
case 11: // sql.Time encoded as long
return Bytes.toBytes(((Time) value).getTime());
case 12:
return Bytes.toBytes((BigDecimal) value);
case 13:
return ((BigInteger) value).toByteArray();
default:
throw new IllegalArgumentException("unsupported type index:" + typeIdx);
}
}
public static int getTypeIndex(TypeInformation typeInfo) {
return getTypeIndex(typeInfo.getTypeClass());
}
public static boolean isSupportedType(Class> clazz) {
return getTypeIndex(clazz) != -1;
}
private static int getTypeIndex(Class> clazz) {
if (byte[].class.equals(clazz)) {
return 0;
} else if (String.class.equals(clazz)) {
return 1;
} else if (Byte.class.equals(clazz)) {
return 2;
} else if (Short.class.equals(clazz)) {
return 3;
} else if (Integer.class.equals(clazz)) {
return 4;
} else if (Long.class.equals(clazz)) {
return 5;
} else if (Float.class.equals(clazz)) {
return 6;
} else if (Double.class.equals(clazz)) {
return 7;
} else if (Boolean.class.equals(clazz)) {
return 8;
} else if (Timestamp.class.equals(clazz)) {
return 9;
} else if (Date.class.equals(clazz)) {
return 10;
} else if (Time.class.equals(clazz)) {
return 11;
} else if (BigDecimal.class.equals(clazz)) {
return 12;
} else if (BigInteger.class.equals(clazz)) {
return 13;
} else {
return -1;
}
}
public static boolean isSupportedType(LogicalType type) {
// ordered by type root definition
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
case BOOLEAN:
case BINARY:
case VARBINARY:
case DECIMAL:
case TINYINT:
case SMALLINT:
case INTEGER:
case DATE:
case INTERVAL_YEAR_MONTH:
case BIGINT:
case INTERVAL_DAY_TIME:
case FLOAT:
case DOUBLE:
return true;
case TIME_WITHOUT_TIME_ZONE:
final int timePrecision = getPrecision(type);
if (timePrecision < MIN_TIME_PRECISION || timePrecision > MAX_TIME_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIME type is out of the range [%s, %s] supported by "
+ "Hbase connector",
timePrecision, MIN_TIME_PRECISION, MAX_TIME_PRECISION));
}
return true;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(type);
if (timestampPrecision < MIN_TIMESTAMP_PRECISION
|| timestampPrecision > MAX_TIMESTAMP_PRECISION) {
throw new UnsupportedOperationException(
String.format(
"The precision %s of TIMESTAMP type is out of the range [%s, %s] supported by "
+ "Hbase connector",
timestampPrecision,
MIN_TIMESTAMP_PRECISION,
MAX_TIMESTAMP_PRECISION));
}
return true;
case TIMESTAMP_WITH_TIME_ZONE:
case ARRAY:
case MULTISET:
case MAP:
case ROW:
case STRUCTURED_TYPE:
case DISTINCT_TYPE:
case RAW:
case NULL:
case SYMBOL:
case UNRESOLVED:
return false;
default:
throw new IllegalArgumentException();
}
}
}