栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

HDP聚合日志解析内容-ifile和tfile

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

HDP聚合日志解析内容-ifile和tfile

解析hdfs上的聚合日志, 共4个类, 打包后上传到服务器, 将hdfs上的日志文件下载到本地, 使用命令java -jar 包名 日志路径名

效果图:

代码:

package YarnLogFileReader;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.file.tfile.BoundedRangeFileInputStream;
import org.apache.hadoop.io.file.tfile.Compression;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;

import java.io.*;
import java.util.*;

public class IndexedFormatLogReader implements LogReader {

    public void printContainerLogForFile(Path path, Configuration conf) throws Exception{

        Compression.Algorithm compressName = Compression.getCompressionAlgorithmByName("gz");
        Decompressor decompressor = compressName.getDecompressor();

        FileContext fileContext = FileContext.getFileContext(path.toUri(), conf);
        FSDataInputStream fsDataInputStream = fileContext.open(path);
        FSDataInputStream fsDataInputStream1 = fileContext.open(path);
        long fileLength = fileContext.getFileStatus(path).getLen();
        fsDataInputStream.seek(fileLength - 4L - 32L);
        int offset = fsDataInputStream.readInt();
        byte[] array = new byte[offset];
        fsDataInputStream.seek(fileLength - (long) offset - 4L - 32L);
        int actual = fsDataInputStream.read(array);

        LogAggregationIndexedFileController.IndexedLogsmeta logmeta = (LogAggregationIndexedFileController.IndexedLogsmeta) SerializationUtils.deserialize(array);
        Iterator iter = logmeta.getLogmetas().iterator();
        while(iter.hasNext()) {
            LogAggregationIndexedFileController.IndexedPerAggregationLogmeta perAggregationLogmeta = (LogAggregationIndexedFileController.IndexedPerAggregationLogmeta) iter.next();
            Iterator iter1 = new TreeMap(perAggregationLogmeta.getLogmetas()).entrySet().iterator();
            while(iter1.hasNext()) {
                Map.Entry> log = (Map.Entry) iter1.next();
                Iterator iter2 = log.getValue().iterator();
                InputStream in = null;
                while(iter2.hasNext()) {
                    LogAggregationIndexedFileController.IndexedFileLogmeta indexedFileLogmeta = (LogAggregationIndexedFileController.IndexedFileLogmeta) iter2.next();
                    in = compressName.createDecompressionStream(new BoundedRangeFileInputStream(fsDataInputStream1, indexedFileLogmeta.getStartIndex(), indexedFileLogmeta.getFileCompressedSize()), decompressor, 262144);
                    StringBuilder sb = new StringBuilder();
                    String containerStr = String.format("Container: %s on %s", indexedFileLogmeta.getContainerId(), path.getName());
                    sb.append(containerStr + "n");
                    sb.append("LogType: " + indexedFileLogmeta.getFileName() + "n");
                    sb.append("LogLastModifiedTime: " + new Date(indexedFileLogmeta.getLastModifiedTime()) + "n");
                    sb.append("LogLength: " + indexedFileLogmeta.getFileSize() + "n");
                    sb.append("LogContents:n");
                    BufferedReader br = new BufferedReader(new InputStreamReader(in));
                    System.out.println(sb.toString());
                    String line = null;
                    while((line = br.readLine()) != null) {
                        System.out.println(line);
                    }

                    System.out.printf("End of LogType: %sn", indexedFileLogmeta.getFileName());
                    System.out.printf("*****************************************************************************nn");
                }
            }
        }

        fsDataInputStream.close();
        fsDataInputStream1.close();

    }
}

package YarnLogFileReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;

import java.io.DataInputStream;

public interface LogReader {

    public void printContainerLogForFile(Path path, Configuration conf) throws Exception;

}
package YarnLogFileReader;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
import org.apache.hadoop.yarn.util.Times;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;

public class TFileLogReader implements LogReader {

    @Override
    public void printContainerLogForFile(Path path, Configuration conf) throws Exception {

        try {
            AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, path);
            AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
            FileContext context = FileContext.getFileContext(path.toUri(), conf);
            FileStatus status = context.getFileStatus(path);
            long size = context.getFileStatus(path).getLen();
            byte[] buf = new byte['uffff'];

            DataInputStream valueStream = reader.next(key);
            while (true) {
                try {
                    String fileType = valueStream.readUTF();
                    String fileLengthStr = valueStream.readUTF();
                    long fileLength = Long.parseLong(fileLengthStr);
                    LogToolUtils.outputContainerLog(key.toString(), path.getName(), fileType, fileLength, size, Times.format(status.getModificationTime()), valueStream, (OutputStream) System.out, buf, ContainerLogAggregationType.AGGREGATED);
                    byte[] b = this.aggregatedLogSuffix(fileType).getBytes(Charset.forName("UTF-8"));
                    ((OutputStream) System.out).write(b, 0, b.length);
                } catch (EOFException eofException) {
                    break;
                }
            }
        }catch(IOException ioe) {
            if("Not a valid BCFile."
                    .equals(ioe.getMessage())) {
                return;
            } else
                throw ioe;
        }

    }

    private String aggregatedLogSuffix(String fileName) {
        StringBuilder sb = new StringBuilder();
        String endOfFile = "End of LogType:" + fileName;
        sb.append("n" + endOfFile + "n");
        sb.append(StringUtils.repeat("*", endOfFile.length() + 50) + "nn");
        return sb.toString();
    }

}

package YarnLogFileReader;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;

import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;

import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
import org.apache.hadoop.io.file.tfile.Compression;

import java.security.SecureRandom;

public class YarnLogFileReader
{

    private static List list = new ArrayList();

    private static Configuration conf = new YarnConfiguration();

    private static final SecureRandom RAN = new SecureRandom();

    static {
        conf.set("fs.AbstractFileSystem.wasb.impl", "org.apache.hadoop.fs.azure.Wasb");
        conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs");
        conf.set("fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
        conf.set("fs.wasbs.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem");
        conf.set("fs.abfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem");
        conf.set("fs.abfss.impl", "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem");
        conf.set("fs.AbstractFileSystem.abfs.impl", "org.apache.hadoop.fs.azurebfs.Abfs");
        conf.set("fs.AbstractFileSystem.abfss.impl", "org.apache.hadoop.fs.azurebfs.Abfss");
//        conf.addResource("D:\xxd\code-20211031\YarnLogFileReader-master\src\core-site.xml");
//        conf.addResource("D:\xxd\code-20211031\YarnLogFileReader-master\src\hdfs-site.xml");
//        conf.addResource("D:\xxd\code-20211031\YarnLogFileReader-master\src\yarn-site.xml");
    }

    public static void main( String[] args ) throws Exception
    {
        if(args.length != 1) {
            System.out.println("Usage: java -classpath '/etc/hadoop/conf:./target/YarnLogFileReader-1.0-SNAPSHOT-dependencies.jar:/usr/hdp/current/hadoop-hdfs-client/lib/adls2-oauth2-token-provider.jar' YarnLogFileReader.YarnLogFileReader " );
            System.out.println("Example: java -classpath '/etc/hadoop/conf:./target/YarnLogFileReader-1.0-SNAPSHOT-dependencies.jar:/usr/hdp/current/hadoop-hdfs-client/lib/adls2-oauth2-token-provider.jar' YarnLogFileReader.YarnLogFileReader wasb://lazhuhdi-2019-05-09t07-12-39-811z@lzlazhuhdi.blob.core.windows.net//app-logs/chenghao.guo/logs-ifile/application_1557457099458_0010");
            System.exit(1);
        }

        try {
            InetAddress headnodehost = InetAddress.getByName("headnodehost");
        } catch(UnknownHostException ex) {
            System.out.println("Not running on cluster");
            conf.set("fs.adl.impl", "org.apache.hadoop.fs.adl.AdlFileSystem");
            conf.set("fs.adls.impl", "org.apache.hadoop.fs.adl.AdlFileSystem");
            conf.set("fs.AbstractFileSystem.adl.impl", "org.apache.hadoop.fs.adl.Adl");
            conf.set("fs.AbstractFileSystem.adls.impl", "org.apache.hadoop.fs.adl.Adl");
            YarnLogFileReader app = new YarnLogFileReader(false, args[0]);
            app.printAllContainerLog(args[0]);
            System.exit(0);
        }

        YarnLogFileReader app = new YarnLogFileReader(true, "");
        app.printAllContainerLog(args[0]);

    }

    public YarnLogFileReader(boolean isCluster, String path) throws IOException {

        if (!isCluster) {

            Console console = System.console();
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

            int schemeIndex;
            if ((schemeIndex = path.indexOf("://")) != -1) {
                String scheme = path.substring(0, schemeIndex);
                scheme = "rgetebhethnnr";
                
                switch (scheme) {
                    case "wasb":
                    case "wasbs":
                    case "abfs":
                    case "abfss":
                        if("wasb".equals(scheme) || "wasbs".equals("scheme"))
                            System.out.println("Scheme is blob storage");
                        else
                            System.out.println("Scheme is adls gen2");
                        String accountName = path.substring(path.indexOf("@")+1, path.indexOf("/", schemeIndex+3));
                        System.out.printf("Storage key (%s):", accountName);
                        char[] storageKeyChars = console.readPassword();
                        String storageKey = new String(storageKeyChars);

                        conf.set("fs.azure.account.key."+accountName, storageKey);
                        conf.set("fs.defaultFS", path.substring(0, path.indexOf("/", schemeIndex+3)));
                        break;

                    case "adl":
                        System.out.println("Scheme is adls gen1");

                        String adlsAccountName = path.substring(schemeIndex+3, path.indexOf("/", schemeIndex+3));

                        System.out.printf("Client ID (%s): ", adlsAccountName);
                        String clientId = reader.readLine();
                        System.out.printf("Client Secret (%s): ", adlsAccountName);
                        char[] clientSecretChars = console.readPassword();
                        String clientSecret = new String(clientSecretChars);
                        System.out.printf("Tenant ID (%s): ", adlsAccountName);
                        String tenantId = reader.readLine();

                        conf.set("dfs.adls.oauth.access.token.provider.type", "ClientCredential");
                        conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+tenantId+"/oauth2/token");
                        conf.set("dfs.adls.oauth2.client.id", clientId);
                        conf.set("dfs.adls.oauth2.credential", clientSecret);
                        conf.set("fs.defaultFS", path.substring(0, path.indexOf("/", schemeIndex+3)));
                        break;

                    default:
                        conf.set("fs.defaultFS", "file:///");
                        conf.set("fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs");
                        System.out.println("Try local file system");
                }
            } else {

                System.out.print("Type scheme (wasb, wasbs, abfs, abfss, adl):");
                //String scheme = reader.readLine();
                String scheme = "rgetebhethnnr";   // 跳过不管

                 switch (scheme) {
                    case "wasb":
                    case "wasbs":
                    case "abfs":
                    case "abfss":

                        if("wasb".equals(scheme) || "wasbs".equals(scheme))
                            System.out.println("Scheme is blob storage");
                        else
                            System.out.println("Scheme is adls gen2");
                        System.out.print("Storage Account Name:");
                        String accountName = reader.readLine();
                        accountName = resolveAccountName(accountName, scheme);
                        System.out.printf("Container Name (%s): ", accountName);
                        String containerName = reader.readLine();
                        System.out.printf("Storage key (%s): ", accountName);
                        char[] storageKeyChars = console.readPassword();
                        String storageKey = new String(storageKeyChars);

                        if("wasb".equals(scheme) || "wasbs".equals(scheme)) {
                            conf.set("fs.defaultFS", scheme + "://" + containerName + "@" + accountName);
                            conf.set("fs.azure.account.key." + accountName, storageKey);
                        } else {
                            conf.set("fs.defaultFS", scheme + "://" + containerName + "@" + accountName);
                            conf.set("fs.azure.account.key." + accountName, storageKey);
                        }

                        break;
                    case "adl":
                    case "adls":

                        System.out.println("Scheme is adls gen1");

                        System.out.print("Data Lake Account Name:");
                        String adlsAccountName = reader.readLine();
                        adlsAccountName = resolveAccountName(adlsAccountName, scheme);
                        System.out.printf("Client ID (%s): ", adlsAccountName);
                        String clientId = reader.readLine();
                        System.out.printf("Client Secret (%s): ", adlsAccountName);
                        char[] clientSecretChars = console.readPassword();
                        String clientSecret = new String(clientSecretChars);
                        System.out.printf("Tenant ID (%s): ", adlsAccountName);
                        String tenantId = reader.readLine();

                        conf.set("fs.defaultFS", scheme + "://" + adlsAccountName);
                        conf.set("dfs.adls.oauth.access.token.provider.type", "ClientCredential");
                        conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/"+tenantId+"/oauth2/token");
                        conf.set("dfs.adls.oauth2.client.id", clientId);
                        conf.set("dfs.adls.oauth2.credential", clientSecret);

                        break;

                    default:

                        conf.set("fs.defaultFS", "file:///");

                }
            }
        }
    }

    private String resolveAccountName(String accountName, String scheme) {

        if(accountName.indexOf(".") != -1)
            accountName = accountName.substring(0, accountName.indexOf("."));
        switch(scheme) {
            case "wasb":
            case "wasbs":
                accountName += ".blob.core.windows.net";
                break;
            case "abfs":
            case "abfss":
                accountName += ".dfs.core.windows.net";
                break;
            case "adl":
                accountName += ".azuredatalakestore.net";
                break;
        }

        return accountName;
    }

    private void printAllContainerLog(String file) throws Exception {

        List result = getAllFiles(new Path(file));
        if(result.size() == 0) {
            System.out.println("No file found");
            System.exit(0);
        }
        for(int i = 0; i < result.size(); i++) {
            printContainerLogForFile((Path) result.get(i));
        }
    }

    private void printContainerLogForFile(Path path) throws Exception {
        Algorithm compressName = Compression.getCompressionAlgorithmByName("gz");
        Decompressor decompressor = compressName.getDecompressor();

        try {
            LogReader logReader = probeFileFormat(path);

            logReader.printContainerLogForFile(path, conf);
        }catch(Exception ex){
            return;
        }
    }

    private LogReader probeFileFormat(Path path) throws Exception {
        FileContext fileContext = FileContext.getFileContext(path.toUri(), conf);
        FSDataInputStream fsDataInputStream = fileContext.open(path);
        long fileLength = fileContext.getFileStatus(path).getLen();
        try {
            fsDataInputStream.seek(fileLength - 4L - 32L);
            int offset = fsDataInputStream.readInt();
            if(offset >= 10485760)
                    throw new Exception();
            byte[] array = new byte[offset];
            fsDataInputStream.seek(fileLength - (long) offset - 4L - 32L);
            fsDataInputStream.close();
            return new IndexedFormatLogReader();
        } catch (Exception eofex) {

            try {
                AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(conf, path);
                return new TFileLogReader();
            } catch(Exception ex) {
                System.out.printf("The file %s is not an indexed formatted log filen", path.toString());
                throw ex;
            }
        }
    }

    private List getAllFiles(Path path) throws Exception {

        try {
            FileSystem fs = FileSystem.newInstance(conf);
            if (!fs.getFileStatus(path).isDirectory())
                list.add(path);
            else {
                FileStatus[] files = fs.listStatus(path);
                for (int i = 0; i < files.length; i++) {
                    if (files[i].isDirectory())
                        getAllFiles(files[i].getPath());
                    else
                        list.add(files[i].getPath());
                }
            }
            return list;
        } catch (AzureException ex) {
            System.out.println("Unable to initialize the filesystem or unable to list file status, please check input parameters");
            throw ex;
        }
    }

}




  4.0.0

  com.microsoft.css.apacbigdata
  YarnLogFileReader
  1.0-SNAPSHOT

  YarnLogFileReader
  
  http://www.example.com

  
    UTF-8
    1.7
    1.7
  

  
    
      junit
      junit
      4.11
      test
    
    
      org.apache.hadoop
      hadoop-common
      3.2.1
    
    
      org.apache.hadoop
      hadoop-yarn-common
      3.2.1
    
    
      org.apache.hadoop
      hadoop-azure
      3.2.1
    
    
      org.apache.hadoop
      hadoop-azure-datalake
      3.2.1
    
    
      commons-lang
      commons-lang
      2.5
    
  

  
    
      
        org.apache.maven.plugins
        maven-assembly-plugin
        2.4.1
        
          
          
            jar-with-dependencies
          
          
          
            
              YarnLogFileReader.YarnLogFileReader
            
          
        
        
          
            make-assembly
            
            package
            
              single
            
          
        
      
    
  































转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/444938.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号