Lakka-LibreELEC/packages/addons/service/nextpvr/source/addon.py
2022-11-03 08:10:26 -04:00

234 lines
9.5 KiB
Python

# SPDX-License-Identifier: GPL-2.0-only
# Copyright (C) 2022-present Team LibreELEC (https://libreelec.tv)
import urllib.request, urllib.parse, urllib.error, os, zipfile
from urllib.error import URLError
import urllib.parse as urlparse
import requests
import json
import subprocess
from urllib.parse import parse_qs
import xbmc, xbmcvfs, xbmcgui, xbmcaddon
import shutil
import sys
import xml.etree.ElementTree as ET
temp = xbmcvfs.translatePath('special://temp')
ADDON_NAME = xbmcaddon.Addon().getAddonInfo('name')
LS = xbmcaddon.Addon().getLocalizedString
# Ignore isbn tables
SCANTABLES = ['atsc', 'dvb-c', 'dvb-s', 'dvb-t']
GENERIC_URL = 'https://nextpvr.com/stable/linux/NPVR.zip'
class Controller():
def __init__(self):
pass
def downloadScanTable(self):
# Taken from TVHeadend Addon
try:
url = 'https://github.com/tvheadend/dtv-scan-tables/archive/tvheadend.zip'
archive = os.path.join(temp, 'dtv_scantables.zip')
temp_folder = os.path.join(temp, 'dtv-scan-tables-tvheadend')
dest_folder = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('path')), 'dtv-scan-tables')
xbmcgui.Dialog().notification(ADDON_NAME, LS(30042), xbmcgui.NOTIFICATION_INFO)
urllib.request.urlretrieve(url, archive)
zip = zipfile.ZipFile(archive)
if zip.testzip() is not None: raise zipfile.BadZipfile
if os.path.exists(temp_folder): shutil.rmtree(temp_folder)
if os.path.exists(dest_folder): shutil.rmtree(dest_folder)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30043), xbmcgui.NOTIFICATION_INFO)
for idx, folder in enumerate(SCANTABLES):
for z in zip.filelist:
if folder in z.filename: zip.extract(z.filename, temp)
for folder in SCANTABLES:
shutil.copytree(os.path.join(temp_folder, folder), os.path.join(dest_folder, folder))
if os.path.exists(temp_folder): shutil.rmtree(temp_folder)
os.remove(archive)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30039), xbmcgui.NOTIFICATION_INFO)
except URLError as e:
xbmc.log('Could not download file: %s' % e.reason, xbmc.LOGERROR)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30040), xbmcgui.NOTIFICATION_ERROR)
except zipfile.BadZipfile:
xbmc.log('Could not extract files from zip, bad zipfile', xbmc.LOGERROR)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30041), xbmcgui.NOTIFICATION_ERROR)
def updateNextPVR(self):
try:
dest_folder = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('path')), 'nextpvr-bin')
archive = os.path.join(temp, 'NPVR.zip')
xbmcgui.Dialog().notification(ADDON_NAME, LS(30011), xbmcgui.NOTIFICATION_INFO)
urllib.request.urlretrieve(GENERIC_URL, archive)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30012), xbmcgui.NOTIFICATION_INFO)
zip = zipfile.ZipFile(archive)
if zip.testzip() is not None: raise zipfile.BadZipfile
zip.close()
command = 'unzip -o {0} -d {1} > /dev/null'.format(archive, dest_folder)
xbmc.log('Running: %s' % command, xbmc.LOGDEBUG)
os.system(command)
os.remove(archive)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30039), xbmcgui.NOTIFICATION_INFO)
xbmc.log('NPVR.zip installed', xbmc.LOGDEBUG)
if xbmcgui.Dialog().yesno("NextPVR Server", LS(30020)):
self.id = xbmcaddon.Addon().getAddonInfo('id')
subprocess.call(['systemctl', 'restart', self.id])
except URLError as e:
xbmc.log('Could not download file: %s' % e.reason, xbmc.LOGERROR)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30040), xbmcgui.NOTIFICATION_ERROR)
except zipfile.BadZipfile:
xbmc.log('Could not extract files from zip, bad zipfile', xbmc.LOGERROR)
xbmcgui.Dialog().notification(ADDON_NAME, LS(30041), xbmcgui.NOTIFICATION_ERROR)
def sessionLogin(self):
self.session = requests.session()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'
}
response = self.session.get(self.url, headers=headers)
parsed = urlparse.urlparse(response.url)
salt = parse_qs(parsed.query)['salt'][0]
if self.hashedPassword == None:
passwordHash = self.hashMe(self.password)
else:
passwordHash = self.hashedPassword
combined = self.hashMe(salt + ':' + self.username + ':' + passwordHash)
response = self.session.get(self.url + 'login.html?hash='+combined)
if response.status_code != 200 and response.status_code != 302 :
print(response.text, response.status_code)
sys.exit()
for cookie in self.session.cookies:
self.session.cookies[cookie.name] = cookie.value
def doSessionRequest5(self, method, isJSON = True):
xbmc.log(method, xbmc.LOGDEBUG)
retval = False
getResult = None
url = self.url + 'service?method=' + method
try:
request = self.session.get(url, headers={"Accept" : "application/json"})
getResult = json.loads(request.text)
if request.status_code == 200 :
if 'stat' in getResult:
retval = getResult['stat'] == 'ok'
else:
retval = True
else:
xbmc.log(getResult, xbmc.LOGERROR)
except Exception as e:
xbmc.log(str(e), xbmc.LOGERROR)
return retval, getResult
def hashMe (self, thedata):
import hashlib
h = hashlib.md5()
h.update(thedata.encode('utf-8'))
return h.hexdigest()
def loginNextPVR(self):
base = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('profile')), 'config/config.xml')
tree = ET.parse(base)
root = tree.getroot()
child = root.find("WebServer")
self.port = child.find('Port').text
self.username = child.find('Username').text
self.hashedPIN = child.find('PinMD5').text
self.hashedPassword = child.find('Password').text.lower()
self.ip = '127.0.0.1'
self.url = 'http://{}:{}/'.format(self.ip, self.port)
self.sessionLogin()
def showMessage(self, message):
xbmc.log(message, xbmc.LOGDEBUG)
xbmcgui.Dialog().notification(ADDON_NAME, message, xbmcgui.NOTIFICATION_INFO)
def updateEpg(self):
self.loginNextPVR()
self.doSessionRequest5('system.epg.update')
self.doSessionRequest5('session.logout')
self.showMessage(LS(30015))
def updateM3u(self):
self.loginNextPVR()
self.doSessionRequest5('setting.m3u.update')
self.doSessionRequest5('session.logout')
self.showMessage(LS(30016))
def rescanDevices(self):
self.loginNextPVR()
self.doSessionRequest5('setting.devices&refresh=true')
self.doSessionRequest5('session.logout')
self.showMessage(LS(30017))
def transcodeHLS(self):
base = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('profile')), 'config/config.xml')
tree = ET.parse(base)
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True))
tree = ET.parse(base, parser=parser)
root = tree.getroot()
parent = root.find("WebServer")
child = parent.find('TranscodeHLS')
if child.text == 'default':
child.text = '-y [ANALYZE_DURATION] [SEEK] -i [SOURCE] -map_metadata -1 -threads [THREADS] -ignore_unknown -map 0:v:0? [PREFERRED_LANGUAGE] -map 0:a:[AUDIO_STREAM] -map -0:s -vcodec copy -acodec aac -ac 2 -c:s copy -hls_time [SEGMENT_DURATION] -start_number 0 -hls_list_size [SEGMENT_COUNT] -y [TARGET]'
else:
child.text = 'default'
tree.write(base, encoding='utf-8')
if child.text == 'default':
self.showMessage(LS(30018))
else:
self.showMessage(LS(30019))
def resetWebCredentials(self):
rewrite = False
base = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('profile')), 'config/config.xml')
tree = ET.parse(base)
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True))
tree = ET.parse(base, parser=parser)
root = tree.getroot()
parent = root.find("WebServer")
child = parent.find('Username')
if child.text != 'admin':
child.text = 'admin'
rewrite = True
child = parent.find('Password')
if child.text != '5f4dcc3b5aa765d61d8327deb882cf99':
child.text = '5f4dcc3b5aa765d61d8327deb882cf99'
rewrite = True
if rewrite:
tree.write(base, encoding='utf-8')
self.showMessage(LS(30046))
if __name__ == '__main__':
option = Controller()
try:
if sys.argv[1] == 'getscantables':
option.downloadScanTable()
elif sys.argv[1] == 'updategeneric':
option.updateNextPVR()
elif sys.argv[1] == 'updateepg':
option.updateEpg()
elif sys.argv[1] == 'transcode':
option.transcodeHLS()
elif sys.argv[1] == 'updatem3u':
option.updateM3u()
elif sys.argv[1] == 'rescan':
option.rescanDevices()
elif sys.argv[1] == 'defaults':
option.resetWebCredentials()
except IndexError:
pass