【Flume】TailDirSource源码理解
TaildirSource类图如下(列出主要类)
TailDirSource类
TailDirSource继承了AbstractSource类,而AbstractSource类中channelProcessor属性负责将Source中的Event提交给Channel组件
TailDirSource类通过配置参数匹配日志文件,获取日志文件更新内容并且将已经读取的偏移量记录到特定的文件当中(position file)
configure()方法:
1.判断从配置文件加载的配置是否合法,其中包括了对filegroups,以及以filegroups为单位的文件路径是否存在等条件。
2.对batchSize,skipToEnd,writePosInterval,idleTimeout等变量进行初始化工作
batchSize定义了往Channel中发送Event的批量处理大小
skipToEnd定义了每次程序启动,对文件进行读取的时候,是否从文件尾部开始读取数据,或者从文件最开始读取。
writePosInterval,TaildirSource读取每个监控文件都在位置文件中记录监控文件的已经读取的偏移量,writePosInterval则是定义了更新位置文件的间隔。
idleTimeout日志文件在idleTimeout间隔时间,没有被修改,文件将被关闭
start()方法:
通过configure()初始化后的变量创建了ReliableTaildirEventReader对象,同时创建两个线程池idleFileChecker和positionWriter,分别用于监控日志文件和记录日志文件读取的偏移量。
idleFileChecker实现一个Runnable接口,遍历reader所有监控的文件,检查文件最后修改时间+idleTimeout是否小于当前时间,说明日志文件在idleTimeout时间内没有被修改,该文件将被关闭。
private class idleFileCheckerRunnable implements Runnable { @Override public void run() { try { long now = System.currentTimeMillis(); for (TailFile tf : reader.getTailFiles().values()) { if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) { idleInodes.add(tf.getInode()); } } } catch (Throwable t) { logger.error("Uncaught exception in IdleFileChecker thread", t); } }}
positionWriter主要作用是记录日志文件读取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系统中特有属性,在适应其他系统(Windows等)日志采集时ReliableTaildirEventReader.getInode()方法需要修改(注意:在利用Linux系统上inode实现上,文件是通过inode记录日志读取偏移量。所以即使文件名改变了,也不影响日志读取,在我实现Window版本上,只采用了文件名对应日志读取偏移量,文件名改变影响日志读取
)。pos则是记录的日志读取的偏移量,file记录了日志文件的路径
process()方法:
process方法记录了TailDirSource类中主要的逻辑,获取每个监控的日志文件,调用tailFileProcess获取每个日志文件的更新数据,并将每条记录转换为Event(具体细节要看ReliableTaildirEventReader的readEvents方法)
public Status process() { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true); } } closeTailFiles(); try { TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { logger.info("Interrupted while sleeping"); } } catch (Throwable t) { logger.error("Unable to tail files", t); status = Status.BACKOFF; } return status;}
ReliableTaildirEventReader类
构造ReliableTaildirEventReader对象的时候,首先会判断各种必须参数是否合法等,然后加载position file获取每个文件上次记录的日志文件读取的偏移量
loadPositionFile(String filePath) 不粘贴方法的具体代码,主要就是获取每个监控日志文件的读取偏移量
readEvents()的各个不同参数方法中,下面这个是最主要的,该方法获取当前日志文件的偏移量,调用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法将日志文件每行转换为Flume的消息对象Event,并循环将每个event添加header信息。
public List readEvents(int numEvents, boolean backoffWithoutNL) throws IOException { if (!committed) { if (currentFile == null) { throw new IllegalStateException("current file does not exist. " + currentFile.getPath()); } logger.info("Last read was never committed - resetting position"); long lastPos = currentFile.getPos(); currentFile.updateFilePos(lastPos); } List events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset); if (events.isEmpty()) { return events; } Map headers = currentFile.getHeaders(); if (annotateFileName || (headers != null && !headers.isEmpty())) { for (Event event : events) { if (headers != null && !headers.isEmpty()) { event.getHeaders().putAll(headers); } if (annotateFileName) { event.getHeaders().put(fileNameHeader, currentFile.getPath()); } } } committed = false; return events;}
openFile(File file, Map
TailFile类
TaildirSource通过TailFile类操作处理每个日志文件,包含了RandomAccessFile类,以及记录日志文件偏移量pos,最新更新时间lastUpdated等属性
RandomAccessFile完美的符合TaildirSource的应用场景,RandomAccessFile支持使用seek()方法随机访问文件,配合position file中记录的日志文件读取偏移量,能够轻松简单的seek到文件偏移量,然后向后读取日志内容,并重新将新的偏移量记录到position file中。
readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法:
下图描述了该方法的调用层级,readEvent简单的理解就是将每行日志转为Event消息体,方法最终调用的是readFile()方法。
readLine()方法,有点难还在研究
public LineResult readLine() throws IOException { LineResult lineResult = null; while (true) { if (bufferPos == NEED_READING) { if (raf.getFilePointer() < raf.length()) {//当文件指针位置小于文件总长度的时候,就需要读取指针位置到文件最后的数据 readFile(); } else { if (oldBuffer.length > 0) { lineResult = new LineResult(false, oldBuffer); oldBuffer = new byte[0]; setLineReadPos(lineReadPos + lineResult.line.length); } break; } } for (int i = bufferPos; i < buffer.length; i++) { if (buffer[i] == BYTE_NL) { int oldLen = oldBuffer.length; // Don't copy last byte(NEW_LINE) int lineLen = i - bufferPos; // For windows, check for CR if (i > 0 && buffer[i - 1] == BYTE_CR) { lineLen -= 1; } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) { oldLen -= 1; } lineResult = new LineResult(true, concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen)); setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1))); oldBuffer = new byte[0]; if (i + 1 < buffer.length) { bufferPos = i + 1; } else { bufferPos = NEED_READING; } break; } } if (lineResult != null) { break; } // NEW_LINE not showed up at the end of the buffer oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, bufferPos, buffer.length - bufferPos); bufferPos = NEED_READING; } return lineResult;}
readFile()按BUFFER_SIZE(默认8KB)作为缓冲读取日志文件数据
private void readFile() throws IOException { if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) { buffer = new byte[(int) (raf.length() - raf.getFilePointer())]; } else { buffer = new byte[BUFFER_SIZE]; } raf.read(buffer, 0, buffer.length); bufferPos = 0;}