From 1bfc61e1118f423062970b36488ee64c91d1577a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alberto=20Gonz=C3=A1lez=20Palomo?= Date: Mon, 12 Feb 2018 19:06:08 +0100 Subject: [PATCH] Improved named pipe capability, and spool bug fix. --- .../fetter/logstashforwarder/FileReader.java | 22 ++-- .../fetter/logstashforwarder/FileSigner.java | 17 +-- .../fetter/logstashforwarder/FileState.java | 18 ++- .../fetter/logstashforwarder/FileWatcher.java | 40 ++++-- .../logstashforwarder/util/LogFile.java | 39 ++++++ .../logstashforwarder/util/NamedPipe.java | 115 ++++++++++++++++++ .../util/RandomAccessFile.java | 8 +- 7 files changed, 223 insertions(+), 36 deletions(-) create mode 100644 src/main/java/info/fetter/logstashforwarder/util/LogFile.java create mode 100644 src/main/java/info/fetter/logstashforwarder/util/NamedPipe.java diff --git a/src/main/java/info/fetter/logstashforwarder/FileReader.java b/src/main/java/info/fetter/logstashforwarder/FileReader.java index 8ee23b6..d4e70ed 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileReader.java +++ b/src/main/java/info/fetter/logstashforwarder/FileReader.java @@ -19,11 +19,12 @@ */ import info.fetter.logstashforwarder.util.AdapterException; +import info.fetter.logstashforwarder.util.LogFile; import info.fetter.logstashforwarder.util.RandomAccessFile; +import info.fetter.logstashforwarder.util.NamedPipe; import java.io.File; import java.io.IOException; -//import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -75,11 +76,11 @@ private int readFile(FileState state, int spaceLeftInSpool) { long pointer = state.getPointer(); int numberOfEvents = 0; try { - if(state.isDeleted() || state.getRandomAccessFile() == null) { // Don't try to read this file + if(state.isDeleted() || state.getLogFile() == null) { // Don't try to read this file if(logger.isTraceEnabled()) { logger.trace("File : " + file + " has been deleted"); } - } else if(state.getRandomAccessFile().isEmpty()) { + } else if(state.getLogFile().isEmpty()) { if(logger.isTraceEnabled()) { logger.trace("File : " + file + " is empty"); } @@ -104,8 +105,10 @@ private int readFile(FileState state, int spaceLeftInSpool) { } private boolean isCompressedFile(FileState state) { - RandomAccessFile reader = state.getRandomAccessFile(); - + LogFile logFile = state.getLogFile(); + if (!(logFile instanceof RandomAccessFile)) return false; + RandomAccessFile reader = (RandomAccessFile) logFile; + try { for(byte[] magic : MAGICS) { byte[] fileBytes = new byte[magic.length]; @@ -143,14 +146,15 @@ private static void copyLineToBuffer(byte[] line, ByteBuffer byteBuffer) { } private long readLines(FileState state, int spaceLeftInSpool) { - RandomAccessFile reader = state.getRandomAccessFile(); + LogFile reader = state.getLogFile(); long pos = state.getPointer(); Multiline multiline = state.getMultiline(); + if (spaceLeftInSpool < 1) return pos; try { reader.seek(pos); byte[] line = readLine(reader); bufferedLines.clear(); - + if(multiline != null && multiline.isPrevious()) { spaceLeftInSpool--; } @@ -201,7 +205,7 @@ private long readLines(FileState state, int spaceLeftInSpool) { } } } - line = readLine(reader); + if (spaceLeftInSpool > 0) line = readLine(reader); } if(bufferedLines.position() > 0) { addEvent(state, pos, extractBytes(bufferedLines)); // send any buffered lines left @@ -213,7 +217,7 @@ private long readLines(FileState state, int spaceLeftInSpool) { return pos; } - private byte[] readLine(RandomAccessFile reader) throws IOException { + private byte[] readLine(LogFile reader) throws IOException { byteBuffer.clear(); int ch; boolean seenCR = false; diff --git a/src/main/java/info/fetter/logstashforwarder/FileSigner.java b/src/main/java/info/fetter/logstashforwarder/FileSigner.java index 8480f80..12e667e 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileSigner.java +++ b/src/main/java/info/fetter/logstashforwarder/FileSigner.java @@ -1,20 +1,23 @@ package info.fetter.logstashforwarder; -import info.fetter.logstashforwarder.util.RandomAccessFile; - import java.io.IOException; -//import java.io.RandomAccessFile; import java.util.zip.Adler32; +import info.fetter.logstashforwarder.util.LogFile; +import info.fetter.logstashforwarder.util.RandomAccessFile; + public class FileSigner { private static final Adler32 adler32 = new Adler32(); - - public static long computeSignature(RandomAccessFile file, int signatureLength) throws IOException { + + public static long computeSignature(LogFile logFile, int signatureLength) throws IOException { + if (!(logFile instanceof RandomAccessFile)) return 0; + + RandomAccessFile reader = (RandomAccessFile) logFile; adler32.reset(); byte[] input = new byte[signatureLength]; - file.seek(0); - file.read(input); + reader.seek(0); + reader.read(input); adler32.update(input); return adler32.getValue(); } diff --git a/src/main/java/info/fetter/logstashforwarder/FileState.java b/src/main/java/info/fetter/logstashforwarder/FileState.java index 8e69d57..c03e5a7 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileState.java +++ b/src/main/java/info/fetter/logstashforwarder/FileState.java @@ -17,7 +17,9 @@ * */ +import info.fetter.logstashforwarder.util.LogFile; import info.fetter.logstashforwarder.util.RandomAccessFile; +import info.fetter.logstashforwarder.util.NamedPipe; import java.io.File; import java.io.FileNotFoundException; @@ -47,7 +49,7 @@ public class FileState { @JsonIgnore private boolean changed = false; @JsonIgnore - private RandomAccessFile randomAccessFile; + private LogFile logFile; private long pointer = 0; @JsonIgnore private FileState oldFileState; @@ -67,7 +69,11 @@ public FileState(File file) throws IOException { this.file = file; directory = file.getCanonicalFile().getParent(); fileName = file.getName(); - randomAccessFile = new RandomAccessFile(file.getPath(), "r"); + if (file.isFile()) { + logFile = new RandomAccessFile(file.getPath(), "r"); + } else { + logFile = new NamedPipe(file); + } lastModified = file.lastModified(); size = file.length(); } @@ -75,7 +81,7 @@ public FileState(File file) throws IOException { private void setFileFromDirectoryAndName() throws FileNotFoundException { file = new File(directory + File.separator + fileName); if(file.exists()) { - randomAccessFile = null; + logFile = null; lastModified = file.lastModified(); size = file.length(); } else { @@ -141,8 +147,8 @@ public void setSignature(long signature) { this.signature = signature; } - public RandomAccessFile getRandomAccessFile() { - return randomAccessFile; + public LogFile getLogFile() { + return logFile; } public long getPointer() { @@ -172,7 +178,7 @@ public void setOldFileState(FileState oldFileState) { public void deleteOldFileState() { try { - oldFileState.getRandomAccessFile().close(); + oldFileState.getLogFile().close(); oldFileState = null; } catch(Exception e) {} } diff --git a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java index c60ab16..8f733f9 100644 --- a/src/main/java/info/fetter/logstashforwarder/FileWatcher.java +++ b/src/main/java/info/fetter/logstashforwarder/FileWatcher.java @@ -2,6 +2,7 @@ /* * Copyright 2015 Didier Fetter + * Copyright 2017 Alberto González Palomo https://sentido-labs.com * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -136,7 +137,7 @@ private void processModifications() throws IOException { logger.trace("Same signature size and value : file is the same"); continue; } else if(oldState.getSignatureLength() < state.getSignatureLength()){ - long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength()); + long signature = FileSigner.computeSignature(state.getLogFile(), oldState.getSignatureLength()); if(signature == oldState.getSignature()) { state.setOldFileState(oldState); logger.trace("Same signature : file is the same"); @@ -163,7 +164,7 @@ private void processModifications() throws IOException { logger.trace("Same signature size and value : file is the same"); break; } else if(otherState.getSignatureLength() < state.getSignatureLength()){ - long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength()); + long signature = FileSigner.computeSignature(state.getLogFile(), otherState.getSignatureLength()); if(signature == otherState.getSignature()) { state.setOldFileState(otherState); logger.trace("Same signature : file is the same"); @@ -194,7 +195,7 @@ private void processModifications() throws IOException { logger.debug("File " + state.getFile() + " has been replaced and not renamed, removing from watchMap"); } try { - oldState.getRandomAccessFile().close(); + oldState.getLogFile().close(); } catch(Exception e) {} oldWatchMap.remove(state.getFile()); } @@ -223,12 +224,33 @@ private void processModifications() throws IOException { removeMarkedFilesFromWatchMap(); } + // This filter will accept anything that is not a directory, + // including named pipes (FIFOs), sockets and device files. + // The standard org.apache.commons.io.filefilter.FileFileFilter excludes + // them even if their documentation says + // "This filter accepts Files that are files (not directories)." + protected class FileFileFilter implements IOFileFilter + { + @Override + public boolean accept(File file) { + return !file.isDirectory(); + } + + @Override + public boolean accept(File dir, String name) { + return accept(new File(dir, name)); + } + } + protected IOFileFilter fileFileFilter() { + return new FileFileFilter(); + } + private void addSingleFile(String fileToWatch, Event fields, long deadTime, Multiline multiline, Filter filter) throws Exception { logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath()); String directory = FilenameUtils.getFullPath(fileToWatch); String fileName = FilenameUtils.getName(fileToWatch); IOFileFilter fileFilter = FileFilterUtils.and( - FileFilterUtils.fileFileFilter(), + fileFileFilter(), FileFilterUtils.nameFileFilter(fileName), new LastModifiedFileFilter(deadTime)); initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter); @@ -240,7 +262,7 @@ private void addWildCardFiles(String filesToWatch, Event fields, long deadTime, String wildcard = FilenameUtils.getName(filesToWatch); logger.trace("Directory : " + new File(directory).getCanonicalPath() + ", wildcard : " + wildcard); IOFileFilter fileFilter = FileFilterUtils.and( - FileFilterUtils.fileFileFilter(), + fileFileFilter(), new WildcardFileFilter(wildcard), new LastModifiedFileFilter(deadTime)); initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter); @@ -273,7 +295,7 @@ private void addFileToWatchMap(Map map, File file, Event fields, state.setFields(fields); int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize()); state.setSignatureLength(signatureLength); - long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength); + long signature = FileSigner.computeSignature(state.getLogFile(), signatureLength); state.setSignature(signature); logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature); state.setMultiline(multiline); @@ -331,7 +353,7 @@ private void removeMarkedFilesFromWatchMap() throws IOException { List markedList = null; for(File file : oldWatchMap.keySet()) { FileState state = oldWatchMap.get(file); - if(state.getRandomAccessFile() == null) { + if(state.getLogFile() == null) { state.setDeleted(); } if(state.isDeleted()) { @@ -342,7 +364,7 @@ private void removeMarkedFilesFromWatchMap() throws IOException { markedList.add(file); } try { - state.getRandomAccessFile().close(); + state.getLogFile().close(); } catch(Exception e) {} } } @@ -358,7 +380,7 @@ public void close() throws IOException { logger.debug("Closing all files"); for(File file : oldWatchMap.keySet()) { FileState state = oldWatchMap.get(file); - state.getRandomAccessFile().close(); + state.getLogFile().close(); } } diff --git a/src/main/java/info/fetter/logstashforwarder/util/LogFile.java b/src/main/java/info/fetter/logstashforwarder/util/LogFile.java new file mode 100644 index 0000000..a5efb2a --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/util/LogFile.java @@ -0,0 +1,39 @@ +package info.fetter.logstashforwarder.util; + +/* + * Copyright 2018 Alberto González Palomo https://sentido-labs.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.log4j.Logger; + +public interface LogFile { + /** + * Check whether the file is empty: normal files are empty + * when their size is zero, but other kinds of files like + * named pipes / FIFOs do not report a size. + */ + public abstract boolean isEmpty() throws IOException; + public abstract void seek(long pos) throws IOException; + public abstract long getFilePointer() throws IOException; + public abstract int read() throws IOException; + public abstract void close() throws IOException; +} diff --git a/src/main/java/info/fetter/logstashforwarder/util/NamedPipe.java b/src/main/java/info/fetter/logstashforwarder/util/NamedPipe.java new file mode 100644 index 0000000..e978cb6 --- /dev/null +++ b/src/main/java/info/fetter/logstashforwarder/util/NamedPipe.java @@ -0,0 +1,115 @@ +package info.fetter.logstashforwarder.util; + +/* + * Copyright 2018 Alberto González Palomo https://sentido-labs.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.log4j.Logger; + +public class NamedPipe implements LogFile { + private static Logger logger = Logger.getLogger(NamedPipe.class); + + protected File file; + protected Pipe pipe; + + // We need to keep a cache of inputStreams because + // the input stream of a pipe can not be opened more than once: + // it will block and not return even after the previous stream + // has been closed. + protected static Map pipes = new HashMap(); + protected class Pipe { + public int refCount; + public FileInputStream inputStream; + public long bytesRead; + public Pipe() + { + refCount = 1; + inputStream = null; + bytesRead = 0; + } + } + + private NamedPipe() {} + + public NamedPipe(File file) + { + this.file = file; + synchronized (pipes) { + pipe = pipes.get(file); + if (pipe == null) { + pipe = new Pipe(); + pipes.put(file, pipe); + (this.new OpenerThread()).start(); + } else { + synchronized (pipe) { + ++pipe.refCount; + } + } + } + } + + protected class OpenerThread extends Thread + { + public void run() + { + try { + pipe.inputStream = new FileInputStream(file); + } catch (IOException e) { + logger.error("Error opening named pipe: " + e.getMessage()); + } + } + } + + public boolean isEmpty() throws IOException { return false; } + public void seek(long pos) throws IOException { } + public long getFilePointer() throws IOException { return pipe.bytesRead; } + + public int read() throws IOException + { + synchronized (pipe) { + if (pipe.inputStream != null && pipe.inputStream.available() > 0) { + ++pipe.bytesRead; + return pipe.inputStream.read(); + } else { + return -1; + } + } + } + + public void close() throws IOException + { + synchronized (pipe) { + synchronized (pipes) { + --pipe.refCount; + if (pipe.refCount == 0) { + pipe.inputStream.close(); + pipe.inputStream = null; + pipes.remove(file); + } + } + } + } + + @Override + protected void finalize() throws IOException { close(); } +} diff --git a/src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java b/src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java index 5219721..6213c97 100644 --- a/src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java +++ b/src/main/java/info/fetter/logstashforwarder/util/RandomAccessFile.java @@ -66,7 +66,7 @@ * @see java.io.RandomAccessFile */ -public class RandomAccessFile implements DataInput, DataOutput { +public class RandomAccessFile implements DataInput, DataOutput, LogFile { static public final int BIG_ENDIAN = 0; static public final int LITTLE_ENDIAN = 1; @@ -166,7 +166,7 @@ static public long getDebugNbytes() { * write operation. */ protected long filePosition; - + /** * The buffer used for reading the data. */ @@ -453,7 +453,7 @@ public void order(int endian) { public FileDescriptor getFD() throws IOException { return (file == null) ? null : file.getFD(); } - + public boolean isEmpty() throws IOException { return length() == 0; } @@ -1738,5 +1738,3 @@ public boolean searchForward(KMPMatch match, int maxBytes) throws IOException { } } - -