本文实例为大家分享了读取用户登入出日志并上传服务端的具体实现代码,供大家参考,具体内容如下
该客户端运行在给用户提供unix服务的服务器上。用来读取并收集该服务器上用户的上下线信息,并进行配对整理后发送给服务端汇总。
具体实现代码:
1. DMSServer.java
package com.dms;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.linkedBlockingQueue;
import org.dom4j.document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
public class DMSServer {
//属性定义
//用来接收客户端连接的服务端的ServerSocket
private ServerSocket server;
//用来管理处理客户端请求的线程的线程池
private ExecutorService threadPool;
//保存所有客户端发送过来配对日志的文件
private File serverLogFile;
//消息队列
private BlockingQueue messageQueue = new linkedBlockingQueue();
public DMSServer() throws Exception{
try {
System.out.println("服务端正在初始化...");
//1 解析配置文件server-config.xml
Map config = loadConfig();
//2 根据配置文件内容初始化属性
init(config);
System.out.println("服务端初始化完毕...");
} catch (Exception e) {
System.out.println("初始化服务端失败!");
throw e;
}
}
private Map loadConfig() throws Exception{
try {
SAXReader reader = new SAXReader();
document doc
= reader.read(new File("server-config.xml"));
Element root = doc.getRootElement();
Map config
= new HashMap();
List list = root.elements();
for(Element e : list){
String key = e.getName();
String value = e.getTextTrim();
config.put(key, value);
}
return config;
} catch (Exception e) {
System.out.println("解析配置文件异常!");
e.printStackTrace();
throw e;
}
}
private void init(Map config) throws Exception{
this.server = new ServerSocket(
Integer.parseInt(config.get("serverport"))
);
this.serverLogFile = new File(
config.get("logrecfile")
);
this.threadPool = Executors.newFixedThreadPool(
Integer.parseInt(config.get("threadsum"))
);
}
public void start() throws Exception{
try {
System.out.println("服务端开始工作...");
SaveLogHandler slh=new SaveLogHandler();
new Thread(slh).start();
while(true){
Socket socket=server.accept();
threadPool.execute(new ClientHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
public static void main(String[] args) {
try {
DMSServer server = new DMSServer();
server.start();
} catch (Exception e) {
System.out.println("启动服务端失败!");
}
}
private class SaveLogHandler implements Runnable{
public void run(){
PrintWriter pw = null;
try {
pw = new PrintWriter(
new FileOutputStream(
serverLogFile,true
)
);
while(true){
if(messageQueue.size()>0){
pw.println(messageQueue.poll());
}else{
pw.flush();
Thread.sleep(500);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally{
if(pw != null){
pw.close();
}
}
}
}
private class ClientHandler implements Runnable{
private Socket socket;
public ClientHandler(Socket socket){
this.socket = socket;
}
public void run(){
PrintWriter pw = null;
try {
//1
pw = new PrintWriter(
new OutputStreamWriter(
socket.getOutputStream(),"UTF-8"
)
);
//2
BufferedReader br = new BufferedReader(
new InputStreamReader(
socket.getInputStream(),"UTF-8"
)
);
//3
String message = null;
while((message = br.readLine())!=null){
if("OVER".equals(message)){
break;
}
//将该日志写入文件保存
messageQueue.offer(message);
}
//4
pw.println("OK");
pw.flush();
} catch (Exception e) {
e.printStackTrace();
pw.println("ERROR");
pw.flush();
} finally{
try {
//与客户端断开连接释放资源
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2. DMSClient.java
package com.dms;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.dom4j.document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import com.dms.bo.LogData;
import com.dms.bo.LogRec;
public class DMSClient {
//属性定义
//第一步:解析日志所需属性
//unix系统日志文件
private File logFile;
//保存解析后日志的文件
private File textLogFile;
//书签文件
private File lastPositionFile;
//每次解析日志的条目数
private int batch;
//第二步:配对日志所需要属性
//保存配对日志的文件
private File logRecFile;
//保存未配对日志的文件
private File loginLogFile;
//第三步:发送日志所需要属性
//服务端地址
private String serverHost;
//服务端端口
private int serverPort;
public DMSClient() throws Exception{
try {
//1 解析配置文件config.xml
Map config = loadConfig();
//打桩
System.out.println(config);
//2 根据配置文件内容初始化属性
init(config);
} catch (Exception e) {
System.out.println("初始化失败!");
throw e;
}
}
private void init(Map config) throws Exception{
try {
logFile = new File(
config.get("logfile")
);
textLogFile = new File(
config.get("textlogfile")
);
lastPositionFile = new File(
config.get("lastpositionfile")
);
batch = Integer.parseInt(
config.get("batch")
);
logRecFile = new File(
config.get("logrecfile")
);
loginLogFile = new File(
config.get("loginlogfile")
);
serverHost = config.get("serverhost");
serverPort = Integer.parseInt(
config.get("serverport")
);
} catch (Exception e) {
System.out.println("初始化属性失败!");
e.printStackTrace();
throw e;
}
}
private Map loadConfig() throws Exception{
try {
SAXReader reader = new SAXReader();
document doc
= reader.read(new File("config.xml"));
Element root = doc.getRootElement();
Map config
= new HashMap();
List list = root.elements();
for(Element e : list){
String key = e.getName();
String value = e.getTextTrim();
config.put(key, value);
}
return config;
} catch (Exception e) {
System.out.println("解析配置文件异常!");
e.printStackTrace();
throw e;
}
}
public void start(){
parseLogs();
matchLogs();
sendLogs();
// while(true){
// //解析日志
// if(!parseLogs()){
//continue;
// }
// //配对日志
// if(!matchLogs()){
//continue;
// }
// //发送日志
// sendLogs();
// }
}
private boolean sendLogs(){
Socket socket = null;
try {
//1
if(!logRecFile.exists()){
System.out.println(logRecFile+"不存在!");
return false;
}
//2
List matches
= IOUtil.loadLogRec(logRecFile);
//3
socket = new Socket(serverHost,serverPort);
//4
PrintWriter pw = new PrintWriter(
new OutputStreamWriter(
socket.getOutputStream(),"UTF-8"
)
);
//5
for(String log : matches){
pw.println(log);
}
//6
pw.println("OVER");
pw.flush();
//7
BufferedReader br = new BufferedReader(
new InputStreamReader(
socket.getInputStream(),"UTF-8"
)
);
//8
String response = br.readLine();
//9
if("OK".equals(response)){
logRecFile.delete();
return true;
}else{
System.out.println("发送日志失败!");
return false;
}
} catch (Exception e) {
System.out.println("发送日志失败!");
e.printStackTrace();
} finally{
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return false;
}
private boolean matchLogs(){
try {
//1
//1.1
if(logRecFile.exists()){
return true;
}
//1.2
if(!textLogFile.exists()){
System.out.println(textLogFile+"不存在!");
return false;
}
//2
List list
= IOUtil.loadLogData(textLogFile);
//3
if(loginLogFile.exists()){
list.addAll(
IOUtil.loadLogData(loginLogFile)
);
}
//4
//4.1
List matches
= new ArrayList();
//4.2
Map loginMap
= new HashMap();
Map logoutMap
= new HashMap();
//4.3
for(LogData logdata: list){
String key = logData.getUser()+","+
logData.getPid();
if(logData.getType()==LogData.TYPE_LOGIN){
loginMap.put(key, logData);
}else if(logData.getType()==LogData.TYPE_LOGOUT){
logoutMap.put(key, logData);
}
}
//4.4
Set> entrySet
= logoutMap.entrySet();
for(Entry e : entrySet){
LogData logout = e.getValue();
LogData login = loginMap.remove(e.getKey());
LogRec logRec = new LogRec(login,logout);
matches.add(logRec);
}
//5
IOUtil.saveCollection(matches, logRecFile);
//6
IOUtil.saveCollection(
loginMap.values(),loginLogFile
);
//7
textLogFile.delete();
//8
return true;
} catch (Exception e) {
System.out.println("配对日志失败!");
e.printStackTrace();
}
return false;
}
private boolean parseLogs(){
RandomAccessFile raf = null;
try {
//1
//1.1
if(textLogFile.exists()){
return true;
}
//1.2
if(!logFile.exists()){
System.out.println(logFile+"不存在!");
return false;
}
//1.3
long lastPosition = hasLogs();
//打桩
// System.out.println(
// "lastPosition:"+lastPosition
// );
if(lastPosition<0){
System.out.println("没有日志可以解析了!");
return false;
}
//2
raf = new RandomAccessFile(logFile,"r");
//3
raf.seek(lastPosition);
//4
List list
= new ArrayList();
for(int i=0;i=LogData.LOG_LENGTH){
return lastPosition;
}
} catch (Exception e) {
e.printStackTrace();
}
return -1;
}
public static void main(String[] args) {
try {
DMSClient client = new DMSClient();
client.start();
} catch (Exception e) {
System.out.println("客户端运行失败!");
}
}
}
3. IOUtil.java
package com.dms;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.dms.bo.LogData;
public class IOUtil {
public static List loadLogRec(File file) throws Exception{
BufferedReader br = null;
try {
br = new BufferedReader(
new InputStreamReader(
new FileInputStream(
file
)
)
);
List list
= new ArrayList();
String line = null;
while((line = br.readLine())!=null){
list.add(line);
}
return list;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(br != null){
br.close();
}
}
}
public static List loadLogData(File file) throws Exception{
BufferedReader br = null;
try {
br = new BufferedReader(
new InputStreamReader(
new FileInputStream(
file
)
)
);
List list = new ArrayList();
String line = null;
while((line = br.readLine())!=null){
LogData logData = new LogData(line);
list.add(logData);
}
return list;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(br!=null){
br.close();
}
}
}
public static void saveLong(
long lon,File file) throws Exception{
PrintWriter pw = null;
try {
pw = new PrintWriter(file);
pw.println(lon);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(pw != null){
pw.close();
}
}
}
public static void saveCollection(
Collection c,File file) throws Exception{
PrintWriter pw = null;
try {
pw = new PrintWriter(file);
for(Object o : c){
pw.println(o);
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(pw != null){
pw.close();
}
}
}
public static String readString(
RandomAccessFile raf,int length) throws Exception{
try {
byte[] data = new byte[length];
raf.read(data);
return new String(data,"ISO8859-1");
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
public static long readLong(File file) throws Exception{
BufferedReader br = null;
try {
br = new BufferedReader(
new InputStreamReader(
new FileInputStream(
file
)
)
);
String line = br.readLine();
return Long.parseLong(line);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally{
if(br != null){
br.close();
}
}
}
}
4. config.xml
wtmpx log.txt last-position.txt 10 logrec.txt login.txt localhost 8088
5. server-config.xml
server-logs.txt 30 8088
以上就是本文的全部内容,希望对大家的学习有所帮助。



