由于Kudu对集群时间精准因此需要安装ntp服务
一.安装ntp服务[root@Linux121 clickhouse-server]# yum -y install ntp [root@Linux121 clickhouse-server]# vim /etc/ntp.conf
各个节点注释下边几行
#server 0.centos.pool.ntp.org iburst #server 1.centos.pool.ntp.org iburst #server 2.centos.pool.ntp.org iburst #server 3.centos.pool.ntp.org iburst
添加修改如下主节点时间信息(在时间保持主节点进行如下设置)
#(第一个ip为网关) restrict 172.16.131.1 mask 255.255.255.0 notrap nomodify server 172.16.131.129 prefer server 127.127.1.0 fudge 127.127.1.0 stratum 10
其他节点对应文件修改如下
server 172.16.131.129 prefer server 127.127.1.0 fudge 127.127.1.0 stratum 10
启动各个节点ntp服务
service ntpd start
核查ntp服务status
[root@Linux123 bin]# service ntpd status
Redirecting to /bin/systemctl status ntpd.service
● ntpd.service - Network Time Service
Loaded: loaded (/usr/lib/systemd/system/ntpd.service; disabled; vendor preset: disabled)
Active: active (running) since Thu 2022-04-21 19:09:10 CST; 24s ago
Process: 10477 ExecStart=/usr/sbin/ntpd -u ntp:ntp $OPTIONS (code=exited, status=0/SUCCESS)
Main PID: 10478 (ntpd)
Tasks: 1
CGroup: /system.slice/ntpd.service
└─10478 /usr/sbin/ntpd -u ntp:ntp -g
Apr 21 19:09:10 Linux123 ntpd[10478]: Listen normally on 2 lo 127.0.0.1 UDP 123
Apr 21 19:09:10 Linux123 ntpd[10478]: Listen normally on 3 ens33 172.16.131.131 UDP 123
Apr 21 19:09:10 Linux123 ntpd[10478]: Listen normally on 4 virbr0 192.168.122.1 UDP 123
Apr 21 19:09:10 Linux123 ntpd[10478]: Listen normally on 5 lo ::1 UDP 123
Apr 21 19:09:10 Linux123 ntpd[10478]: Listen normally on 6 ens33 fe80::20c:29ff:fe...23
Apr 21 19:09:10 Linux123 ntpd[10478]: Listening on routing socket on fd #23 for in...es
Apr 21 19:09:10 Linux123 ntpd[10478]: 0.0.0.0 c016 06 restart
Apr 21 19:09:10 Linux123 ntpd[10478]: 0.0.0.0 c012 02 freq_set kernel 0.000 PPM
Apr 21 19:09:10 Linux123 ntpd[10478]: 0.0.0.0 c011 01 freq_not_set
Apr 21 19:09:12 Linux123 ntpd[10478]: 0.0.0.0 c514 04 freq_mode
Hint: Some lines were ellipsized, use -l to show in full.
[root@Linux123 bin]# ntpstat
synchronised to local net (127.127.1.0) at stratum 11
time correct to within 1948 ms
polling server every 64 s
若启动ntp服务出现如下错误
[root@Linux122 clickhouse-server]# service ntpd status Redirecting to /bin/systemctl status ntpd.service ● ntpd.service - Network Time Service Loaded: loaded (/usr/lib/systemd/system/ntpd.service; enabled; vendor preset: disabled) Active: failed (Result: exit-code) since Thu 2022-04-21 19:09:03 CST; 4min 31s ago Process: 11159 ExecStart=/usr/sbin/ntpd -u ntp:ntp $OPTIONS (code=exited, status=0/SUCCESS) Main PID: 11160 (code=exited, status=1/FAILURE) Apr 21 19:09:03 Linux122 systemd[1]: Starting Network Time Service... Apr 21 19:09:03 Linux122 systemd[1]: Started Network Time Service. Apr 21 19:09:03 Linux122 ntpd[11160]: proto: precision = 0.037 usec Apr 21 19:09:03 Linux122 ntpd[11160]: 0.0.0.0 c01d 0d kern kernel time sync enabled Apr 21 19:09:03 Linux122 systemd[1]: ntpd.service: main process exited, code=exite...RE Apr 21 19:09:03 Linux122 systemd[1]: Unit ntpd.service entered failed state. Apr 21 19:09:03 Linux122 systemd[1]: ntpd.service failed. Hint: Some lines were ellipsized, use -l to show in full.
执行如下命令重启该服务
[root@Linux122 clickhouse-server]# ntpdate -u ntp.api.bz 21 Apr 19:15:33 ntpdate[11392]: adjust time server 114.118.7.163 offset 0.134189 sec [root@Linux122 clickhouse-server]# service ntpd start二.安装kudu
配置yum repository
[root@Linux121 clickhouse-server]# cd /etc/yum.repos.d/ [root@Linux121 yum.repos.d]# ls CentOS-Base.repo CentOS-Debuginfo.repo CentOS-Sources.repo clickhouse.repo CentOS-Base.repo.bak CentOS-fasttrack.repo CentOS-Vault.repo local.repo CentOS-CR.repo CentOS-Media.repo CentOS-x86_64-kernel.repo [root@Linux121 yum.repos.d]# wget http://archive.cloudera.com/kudu/redhat/7/x86_64/kudu/cloudera-kudu.repo --2022-04-21 22:09:37-- http://archive.cloudera.com/kudu/redhat/7/x86_64/kudu/cloudera-kudu.repo Resolving archive.cloudera.com (archive.cloudera.com)... 151.101.72.167 Connecting to archive.cloudera.com (archive.cloudera.com)|151.101.72.167|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 321 [binary/octet-stream] Saving to: ‘cloudera-kudu.repo’ 100%[============================================================>] 321 --.-K/s in 0s 2022-04-21 22:09:37 (8.11 MB/s) - ‘cloudera-kudu.repo’ saved [321/321] [root@Linux121 yum.repos.d]# yum install -y kudu kudu-master kudu-tserver kudu-client0 kudu-client-devel
配置计划,将Linux121作为master,其他节点作为slaver
修改Linux121路径下文件/etc/default/kudu-master
export FLAGS_rpc_bind_addresses=172.16.131.129:7051//指定master节点ip
修改其他各个节点/etc/default/kudu-tserver
表明自己是tserver注意端口7050
export FLAGS_rpc_bind_addresses=172.16.131.129:7050//当前节点的ip
我们可以修改副本数量默认为3
主节点修改master.gflagfile
从节点修改tserver.gflagfile
vim /etc/kudu/conf/master.gflagfile 增加 -default_num_replicas=1
在各个从节点增加master节点信息
[root@Linux123 default]# vim /etc/kudu/conf/tserver.gflagfile --tserver_master_addrs=Linux121:7051
解除kudu用户线程限制
[root@Linux121 conf]# vim /etc/security/limits.d/20-nproc.conf //此处数字可以不同不一定是20 kudu soft nproc unlimited impala soft nproc unlimited
启动kudu
主节点启动master
[root@Linux121 conf]# service kudu-master start Started Kudu Master Server (kudu-master): [ OK ]
可以查看到对应的web界面
http://linux121:8051/masters
从节点启动tserver
[root@Linux121 conf]# service kudu-tserver start Started Kudu Tablet Server (kudu-tserver): [ OK ]
http://linux121:8051/tablet-servers
安装完成
创建表
package kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import java.util.ArrayList;
import java.util.List;
public class TableDemo {
public static void main(String[] args) {
KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder("Linux121");
KuduClient client = builder.build();
String tableName = "student";
List columnSchemas = new ArrayList<>();
ColumnSchema id = new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build();
ColumnSchema name = new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(false).build();
columnSchemas.add(id);
columnSchemas.add(name);
Schema schema = new Schema(columnSchemas);
CreateTableOptions createTableOptions = new CreateTableOptions();
createTableOptions.setNumReplicas(1);
ArrayList partitionRule = new ArrayList<>();
partitionRule.add("id");
createTableOptions.addHashPartitions(partitionRule,3);
try {
client.createTable(tableName,schema,createTableOptions);
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
插入数据
package kudu;
import org.apache.kudu.client.*;
public class InsertData {
public static void main(String[] args) {
KuduClient kuduClient = new KuduClient.KuduClientBuilder("Linux121")
.defaultSocketReadTimeoutMs(5000).build();
try {
KuduTable student = kuduClient.openTable("student1");
Insert insert = student.newInsert();
insert.getRow().addInt("id",1);
insert.getRow().addString("name","Tony");
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.apply(insert);
kuduSession.flush();
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
kuduClient.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
查询
package kudu;
import org.apache.kudu.client.*;
public class SelectData {
public static void main(String[] args) {
KuduClient kuduClient = new KuduClient.KuduClientBuilder("Linux121").defaultSocketReadTimeoutMs(5000).build();
try {
KuduTable student1 = kuduClient.openTable("student1");
KuduScanner kuduScanner = kuduClient.newScannerBuilder(student1).build();
while (kuduScanner.hasMoreRows()){
RowResultIterator rowResults = kuduScanner.nextRows();
for (RowResult rowResult : rowResults) {
int id = rowResult.getInt("id");
String name = rowResult.getString("name");
System.out.println("id = " + id + "name = " + name);
}
}
kuduScanner.close();
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
kuduClient.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
更新
package kudu;
import org.apache.kudu.client.*;
public class UpdateData {
public static void main(String[] args) {
KuduClient kuduClient = new KuduClient.KuduClientBuilder("Linux121")
.defaultSocketReadTimeoutMs(5000).build();
try {
KuduTable student = kuduClient.openTable("student1");
Update update = student.newUpdate();
PartialRow row = update.getRow();
row.addInt("id",1);
row.addString("name","Bob");
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.apply(update);
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
kuduClient.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
删除数据
package kudu;
import org.apache.kudu.client.*;
public class DeleteData {
public static void main(String[] args) {
KuduClient kuduClient = new KuduClient.KuduClientBuilder("Linux121")
.defaultSocketReadTimeoutMs(5000).build();
try {
KuduTable student = kuduClient.openTable("student1");
Delete delete = student.newDelete();
PartialRow row = delete.getRow();
row.addInt("id",1);
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.apply(delete);
kuduSession.flush();
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
kuduClient.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
删除表
package kudu;
import org.apache.kudu.client.DeleteTableResponse;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
public class DeleteTable {
public static void main(String[] args) {
KuduClient kuduClient = new KuduClient.KuduClientBuilder("Linux121").defaultSocketReadTimeoutMs(5000).build();
try {
kuduClient.deleteTable("student1");
} catch (KuduException e) {
e.printStackTrace();
}finally {
try {
kuduClient.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
四.Flink数据下沉kudu
package kudu;
public class Person {
private int id;
private String name;
private int age;
public Person() {
}
public Person(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
", age=" + age +
'}';
}
}
package kudu;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.Map;
public class FinkToKuduDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource personDataStreamSource = executionEnvironment.fromElements(
new Person(1, "aaa", 18),
new Person(2, "bbb", 19),
new Person(3, "ccc", 20),
new Person(4, "ddd", 21),
new Person(5, "eee", 22));
SingleOutputStreamOperator
package kudu; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.kudu.client.*; import java.util.Map; public class MySinkToKudu extends RichSinkFunction



