es_streamer

The es_streamer can discover files and stream them to standard output and is typically used to pipe data to the es_agent utility.

The file streamer command is:

es_streamer [--streamer] <glob> <delimiter> <fulfilment> [<passes>]

Where:

<glob> is a glob expression for finding the source files;

<delimiter> is the field delimiter used in the file records and must be a single character;

<fulfilment> is the fulfilment directory for receiving processed files;

<passes> can be used to limit the number of files processed.

Note that the optional --streamer switch is only used with the es_agent utility (see below).

In the glob expression for finding files, glob characters (? and *) must only appear within a file name and not in the directory path. It is also possible to run the FileStream with an explicit file path (without any glob characters) to process a single file.

The file streamer is provided for use with the es_agent to process records from a file stream where a file stream is a directory where new files are regularly deposited for processing. The file streamer requires the following:

  • Files deposited for processing are complete and closed files
  • All files are of the same format and use the same field delimiter
  • The file names are alphabetically ordered in the sequence they are to be processed.

 Hence a source directory for a file stream may contain files named data001.dat, data002.dat etc.

The file streamer will process files in name order and stream the records from each file to standard output. Each record streamed is prepended with the modification time of the file and the record number as the first field of each record output. This first field can be used a message state field and should be configured in the adaptor as a number field with a format mask of 19 digits (i.e. all 9s).  The modification time of a file is calculated relative to 1 Jan 2000 and resolved down to seconds and is presented in the first 10 digits while the record number is presented in the last 9 digits.

For example, the first two data records from a file may be prepended with 0535635255000000001 and 0535635255000000002 respectively.

Note that it is extremely important that the modification time for each file discovered is strictly increasing - otherwise messages will be discarded because of non-advancing state message state.

As each file is processed, it is moved to a specified fulfilment directory to avoid a file being processed more than once. Note that if you edit a file that has been processed, its modification time will change and if the file is moved back to the source directory, it will be treated and processed as new data.

This streamer will wait on files to appear in the source directory indefinitely and will execute until it is aborted via a process or command termination.

The file streamer can be used as input the standard input agent by piping its output to it. For example:

es_streamer <glob> <delimiter> <fulfilment> | es_agent <host> <network> <tag>

Normally, when es_streamer output is piped to the es_agent, they communicate on a record by record basis, but this means that the agent commits occur asynchronously with file boundaries. This can make recovery from failed or aborted file streaming difficult. But both the file streamer and agent can accept a --streamer switch which links commits with the end of each file read. The --streamer option should only be used when piping es_streamer output to the es_agent and when the streamer and agent both use the option.  

 

 

Have more questions? Submit a request

Comments

Article is closed for comments.

Powered by Zendesk