0
0
mirror of https://github.com/libretro/Lakka-LibreELEC.git synced 2025-01-19 14:12:08 +00:00
mglae afa2e54219 docker: fix python addon (rev 132)
- Retry connection on error
- Add periodically kodi termination check via timeout
- Remove unneeded thread
- Do not import oe
2020-12-25 16:16:35 +01:00

162 lines
4.5 KiB
Python
Executable File

#!/usr/bin/env python
"""docker monitor using docker /events HTTP streaming API"""
"""https://github.com/CyberInt/dockermon"""
from contextlib import closing
from functools import partial
from socket import socket, AF_UNIX, timeout
from subprocess import Popen, PIPE
from sys import stdout, version_info
import json
import shlex
if version_info[:2] < (3, 0):
from httplib import OK as HTTP_OK
from urlparse import urlparse
else:
from http.client import OK as HTTP_OK
from urllib.parse import urlparse
__version__ = '0.2.2'
# buffer size must be 256 or lower otherwise events won't show in realtime
bufsize = 256
default_sock_url = 'ipc:///var/run/docker.sock'
class DockermonError(Exception):
pass
def read_http_header(sock):
"""Read HTTP header from socket, return header and rest of data."""
buf = []
hdr_end = '\r\n\r\n'
while True:
buf.append(sock.recv(bufsize).decode('utf-8'))
data = ''.join(buf)
i = data.find(hdr_end)
if i == -1:
continue
return data[:i], data[i + len(hdr_end):]
def header_status(header):
"""Parse HTTP status line, return status (int) and reason."""
status_line = header[:header.find('\r')]
# 'HTTP/1.1 200 OK' -> (200, 'OK')
fields = status_line.split(None, 2)
return int(fields[1]), fields[2]
def connect(url):
"""Connect to UNIX or TCP socket.
url can be either tcp://<host>:port or ipc://<path>
"""
url = urlparse(url)
if url.scheme == 'tcp':
sock = socket()
netloc = tuple(url.netloc.rsplit(':', 1))
hostname = socket.gethostname()
elif url.scheme == 'ipc':
sock = socket(AF_UNIX)
netloc = url.path
hostname = 'localhost'
else:
raise ValueError('unknown socket type: %s' % url.scheme)
sock.connect(netloc)
return sock, hostname
def watch(callback, url=default_sock_url, run=None):
"""Watch docker events. Will call callback with each new event (dict).
url can be either tcp://<host>:port or ipc://<path>
"""
sock, hostname = connect(url)
if run:
sock.settimeout(1.5)
request = 'GET /events HTTP/1.1\nHost: %s\n\n' % hostname
request = request.encode('utf-8')
with closing(sock):
sock.sendall(request)
header, payload = read_http_header(sock)
status, reason = header_status(header)
if status != HTTP_OK:
raise DockermonError('bad HTTP status: %s %s' % (status, reason))
# Messages are \r\n<size in hex><JSON payload>\r\n
buf = [payload]
while True:
try:
chunk = sock.recv(bufsize)
except timeout:
if run():
continue
if run and not run():
raise DockermonError('stopped')
if not chunk:
raise EOFError('socket closed')
buf.append(chunk.decode('utf-8'))
data = ''.join(buf)
i = data.find('\r\n')
if i == -1:
continue
size = int(data[:i], 16)
start = i + 2 # Skip initial \r\n
if len(data) < start + size + 2:
continue
payload = data[start:start+size]
callback(json.loads(payload))
buf = [data[start+size+2:]] # Skip \r\n suffix
def print_callback(msg):
"""Print callback, prints message to stdout as JSON in one line."""
json.dump(msg, stdout)
stdout.write('\n')
stdout.flush()
def prog_callback(prog, msg):
"""Program callback, calls prog with message in stdin"""
pipe = Popen(prog, stdin=PIPE)
data = json.dumps(msg)
pipe.stdin.write(data.encode('utf-8'))
pipe.stdin.close()
if __name__ == '__main__':
from argparse import ArgumentParser
parser = ArgumentParser(description=__doc__)
parser.add_argument('--prog', default=None,
help='program to call (e.g. "jq --unbuffered .")')
parser.add_argument(
'--socket-url', default=default_sock_url,
help='socket url (ipc:///path/to/sock or tcp:///host:port)')
parser.add_argument(
'--version', help='print version and exit',
action='store_true', default=False)
args = parser.parse_args()
if args.version:
print('dockermon %s' % __version__)
raise SystemExit
if args.prog:
prog = shlex.split(args.prog)
callback = partial(prog_callback, prog)
else:
callback = print_callback
try:
watch(callback, args.socket_url)
except (KeyboardInterrupt, EOFError):
pass