材料公司在使用Seatunnel的过程中,规划将Seatunnel集成在平台中,提供可视化操作。
因此目前有如下几个相关的需求:可以通过Web接口,传递参数,启动一个Seatunnel应用可以自定义日志,收集相关指标,目前想到的包括:应用的入流量、出流量;启动时间、结束时间等在任务结束后,可以用applicationId自动从yarn上收集日志(一是手动收集太麻烦,二是时间稍长日志就没了)
- Seatunnel:2.0.5
目前官方2版本还没有正式发布,只能自己下载源码编译。
从Github下载官方源码,clone到本地Idea
github:https://github.com/apache/incubator-seatunnel
官方地址:http://seatunnel.incubator.apache.org/
Idea下方Terminal命令行里,maven打包,执行:mvn clean install -Dmaven.test.skip=true
打包过程大约十几分钟,执行结束后,seatunnel-dist模块的target目录下,可以找到打好包的*.tar.gz压缩安装包
- Spark:2.4.8Hadoop:2.7
导读Seatunnel源码解析(1)-启动应用
Seatunnel源码解析(2)-加载配置文件
Seatunnel源码解析(3)-加载插件
Seatunnel源码解析(4) -启动Spark/Flink程序
Seatunnel源码解析(5)-修改启动LOGO
修改LOGO本章将修改Seatunnel源码,修改Seatunnel的启动LOGO
entryPoint
public class Seatunnel {
...
private static void entryPoint(String configFile, Engine engine) throws Exception {
...
// 打印应用启动LOGO
showAsciiLogo();
...
}
private static void showAsciiLogo() {
String printAsciiLogo = System.getenv("SEATUNNEL_PRINT_ASCII_LOGO");
if ("true".equalsIgnoreCase(printAsciiLogo)) {
AsciiArtUtils.printAsciiArt(Constants.LOGO);
}
}
}
public final class Constants {
public static final String ROW_ROOT = "__root__";
public static final String ROW_TMP = "__tmp__";
public static final String ROW_JSON = "__json__";
// public static final String LOGO = "SeaTunnel";
public static final String LOGO = "^o^";
private Constants() {
}
}
public final class AsciiArtUtils {
...
public static void printAsciiArt(String str) {
final int width = 144;
final int height = 32;
BufferedImage image = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);
Graphics g = image.getGraphics();
g.setFont(new Font("Dialog", Font.PLAIN, FONT_SIZE));
Graphics2D graphics = (Graphics2D) g;
graphics.setRenderingHint(RenderingHints.KEY_TEXT_ANTIALIASING,
RenderingHints.VALUE_TEXT_ANTIALIAS_ON);
graphics.drawString(str, DRAW_X, FONT_SIZE);
for (int y = 0; y < height; y++) {
StringBuilder sb = new StringBuilder();
for (int x = 0; x < width; x++) {
sb.append(image.getRGB(x, y) == RGB ? " " : image.getRGB(x, y) == -1 ? "#" : "*");
}
if (sb.toString().trim().isEmpty()) {
continue;
}
LOGGER.info(String.valueOf(sb));
}
}
}
测试


