Question

Update, 2013-09-12:

I've dug a bit deeper into systemd and it's journal, and, I've stumbled upon this, that states:

systemd-journald will forward all received log messages to the AF_UNIX SOCK_DGRAM socket /run/systemd/journal/syslog, if it exists, which may be used by Unix syslog daemons to process the data further.

As per manpage, I did set up my environment to also have syslog underneath, I've tweaked my code accordingly:

define('NL', "\n\r");

$log = function ()
{
    if (func_num_args() >= 1)
    {
        $message = call_user_func_array('sprintf', func_get_args());

        echo '[' . date('r') . '] ' . $message . NL; 
    }
};

$syslog = '/var/run/systemd/journal/syslog';

$sock = socket_create(AF_UNIX, SOCK_DGRAM, 0);
$connection = socket_connect($sock, $syslog);

if (!$connection)
{
    $log('Couldn\'t connect to ' . $syslog);
}
else
{
    $log('Connected to ' . $syslog);

    $readables = array($sock);

    socket_set_nonblock($sock);

    while (true)
    {
        $read = $readables;
        $write = $readables;
        $except = $readables;

        $select = socket_select($read, $write, $except, 0);

        $log('Changes: %d.', $select);
        $log('-------');
        $log('Read: %d.', count($read));
        $log('Write: %d.', count($write));
        $log('Except: %d.', count($except));

        if ($select > 0)
        {
            if ($read)
            {
                foreach ($read as $readable)
                {
                    $data = socket_read($readable, 4096, PHP_BINARY_READ);

                    if ($data === false)
                    {
                        $log(socket_last_error() . ': ' . socket_strerror(socket_last_error()));
                    }
                    else if (!empty($data))
                    {
                        $log($data);
                    }
                    else
                    {
                        $log('Read empty.');
                    }
                }
            }

            if ($write)
            {
                foreach ($write as $writable)
                {
                    $data = socket_read($writable, 4096, PHP_BINARY_READ);

                    if ($data === false)
                    {
                        $log(socket_last_error() . ': ' . socket_strerror(socket_last_error()));
                    }
                    else if (!empty($data))
                    {
                        $log($data);
                    }
                    else
                    {
                        $log('Write empty.');
                    }
                }
            }
        }
    }
}

This apparently, only sees (selects) changes on write sockets. Well, might be that something here is wrong so I attempted to read from them, no luck (nor there should be):

[Thu, 12 Sep 2013 14:45:15 +0300] Changes: 1.
[Thu, 12 Sep 2013 14:45:15 +0300] -------
[Thu, 12 Sep 2013 14:45:15 +0300] Read: 0.
[Thu, 12 Sep 2013 14:45:15 +0300] Write: 1.
[Thu, 12 Sep 2013 14:45:15 +0300] Except: 0.
[Thu, 12 Sep 2013 14:45:15 +0300] 11: Resource temporarily unavailable

Now, this drives me nuts a little. syslog documentation says this should be possible. What is wrong with the code?

Original:

I had a working prototype, by simply:

while(true)
{
    exec('journalctl -r -n 1 | more', $result, $exit);

    // do stuff
}

But this feels wrong, and consumes too much system resources, then I found out about journald having sockets.

I have attempted to connect and read from:

AF_UNIX, SOCK_DGRAM : /var/run/systemd/journal/socket
AF_UNIX, SOCK_STREAM : /var/run/systemd/journal/stdout

the given sockets.

With /var/run/systemd/journal/socket, socket_select sees 0 changes. With /var/run/systemd/journal/stdout I always (every loop) get 1 change, with 0 byte data.

This is my "reader":

<?php

define('NL', "\n\r");

$journal = '/var/run/systemd/journal/socket';
$jSTDOUT = '/var/run/systemd/journal/stdout';

$journal = $jSTDOUT;

$sock = socket_create(AF_UNIX, SOCK_STREAM, 0);
$connection = @socket_connect($sock, $journal);

$log = function ($message)
{
    echo '[' . date('r') . '] ' . $message . NL; 
};

if (!$connection)
{
    $log('Couldn\'t connect to ' . $journal);
}
else
{
    $log('Connected to ' . $journal);

    $readables = array($sock);

    while (true)
    {
        $read = $readables;

        if (socket_select($read, $write = NULL, $except = NULL, 0) < 1)
        {
            continue;
        }

        foreach ($read as $read_socket)
        {
            $data = @socket_read($read_socket, 1024, PHP_BINARY_READ);

            if ($data === false)
            {
                $log('Couldn\'t read.');

                socket_shutdown($read_socket, 2);
                socket_close($read_socket);

                $log('Server terminated.');
                break 2;
            }

            $data = trim($data);

            if (!empty($data))
            {
                $log($data);
            }
        }
    }

    $log('Exiting.');
}

Having no data in read socket(s), I assume I'm doing something wrong.

Question, idea:

My goal is to read the messages and upon some of them, execute a callback.

Could anyone point me into the right direction of how to programmatically read journal messages?

Was it helpful?

Solution

The sockets under /run/systemd/journal/ won't work for this – …/socket and …/stdout are actually write-only (i.e. used for feeding data into the journal) while the …/syslog socket is not supposed to be used by anything else than a real syslogd, not to mention journald does not send any metadata over it. (In fact, the …/syslog socket doesn't even exist by default – syslogd must actually listen on it, and journald connects to it.)

The official method is to read directly from the journal files, and use inotify to watch for changes (which is the same thing journalctl --follow and even tail -f /var/log/syslog use in place of polling). In a C program, you can use the functions from libsystemd-journal, which will do the necessary parsing and even filtering for you.

In other languages, you have three choices: call the C library; parse the journal files yourself (the format is documented); or fork journalctl --follow which can be told to output JSON-formatted entries (or the more verbose journal export format). The third option actually works very well, since it only forks a single process for the entire stream; I have written a PHP wrapper for it (see below).

Recent systemd versions (v193) also come with systemd-journal-gatewayd, which is essentially a HTTP-based version of journalctl; that is, you can get a JSON or journal-export stream at http://localhost:19531/entries. (Both gatewayd and journalctl even support server-sent events for accessing the stream from HTML 5 webpages.) However, due to obvious security issues, gatewayd is disabled by default.


Attachment: PHP wrapper for journalctl --follow

<?php
/* © 2013 Mantas Mikulėnas <grawity@gmail.com>
 * Released under the MIT Expat License <https://opensource.org/licenses/MIT>
 */

/* Iterator extends Traversable {
    void    rewind()
    boolean valid()
    void    next()
    mixed   current()
    scalar  key()
}
calls:  rewind, valid==true, current, key
    next, valid==true, current, key
    next, valid==false
*/

class Journal implements Iterator {
    private $filter;
    private $startpos;
    private $proc;
    private $stdout;
    private $entry;

    static function _join_argv($argv) {
        return implode(" ",
            array_map(function($a) {
                return strlen($a) ? escapeshellarg($a) : "''";
            }, $argv));
    }

    function __construct($filter=[], $cursor=null) {
        $this->filter = $filter;
        $this->startpos = $cursor;
    }

    function _close_journal() {
        if ($this->stdout) {
            fclose($this->stdout);
            $this->stdout = null;
        }
        if ($this->proc) {
            proc_close($this->proc);
            $this->proc = null;
        }
        $this->entry = null;
    }

    function _open_journal($filter=[], $cursor=null) {
        if ($this->proc)
            $this->_close_journal();

        $this->filter = $filter;
        $this->startpos = $cursor;

        $cmd = ["journalctl", "-f", "-o", "json"];
        if ($cursor) {
            $cmd[] = "-c";
            $cmd[] = $cursor;
        }
        $cmd = array_merge($cmd, $filter);
        $cmd = self::_join_argv($cmd);

        $fdspec = [
            0 => ["file", "/dev/null", "r"],
            1 => ["pipe", "w"],
            2 => ["file", "/dev/null", "w"],
        ];

        $this->proc = proc_open($cmd, $fdspec, $fds);
        if (!$this->proc)
            return false;
        $this->stdout = $fds[1];
    }

    function seek($cursor) {
        $this->_open_journal($this->filter, $cursor);
    }

    function rewind() {
        $this->seek($this->startpos);
    }

    function next() {
        $line = fgets($this->stdout);
        if ($line === false)
            $this->entry = false;
        else
            $this->entry = json_decode($line);
    }

    function valid() {
        return ($this->entry !== false);
        /* null is valid, it just means next() hasn't been called yet */
    }

    function current() {
        if (!$this->entry)
            $this->next();
        return $this->entry;
    }

    function key() {
        if (!$this->entry)
            $this->next();
        return $this->entry->__CURSOR;
    }
}

$a = new Journal();

foreach ($a as $cursor => $item) {
    echo "================\n";
    var_dump($cursor);
    //print_r($item);
    if ($item)
        var_dump($item->MESSAGE);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top