forked from libretro/Lakka-LibreELEC
234 lines
9.5 KiB
Python
234 lines
9.5 KiB
Python
# SPDX-License-Identifier: GPL-2.0-only
|
|
# Copyright (C) 2022-present Team LibreELEC (https://libreelec.tv)
|
|
|
|
import urllib.request, urllib.parse, urllib.error, os, zipfile
|
|
from urllib.error import URLError
|
|
import urllib.parse as urlparse
|
|
import requests
|
|
import json
|
|
import subprocess
|
|
|
|
from urllib.parse import parse_qs
|
|
import xbmc, xbmcvfs, xbmcgui, xbmcaddon
|
|
import shutil
|
|
import sys
|
|
import xml.etree.ElementTree as ET
|
|
|
|
temp = xbmcvfs.translatePath('special://temp')
|
|
|
|
ADDON_NAME = xbmcaddon.Addon().getAddonInfo('name')
|
|
LS = xbmcaddon.Addon().getLocalizedString
|
|
|
|
# Ignore isbn tables
|
|
SCANTABLES = ['atsc', 'dvb-c', 'dvb-s', 'dvb-t']
|
|
GENERIC_URL = 'https://nextpvr.com/stable/linux/NPVR.zip'
|
|
|
|
class Controller():
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def downloadScanTable(self):
|
|
# Taken from TVHeadend Addon
|
|
try:
|
|
url = 'https://github.com/tvheadend/dtv-scan-tables/archive/tvheadend.zip'
|
|
archive = os.path.join(temp, 'dtv_scantables.zip')
|
|
temp_folder = os.path.join(temp, 'dtv-scan-tables-tvheadend')
|
|
dest_folder = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('path')), 'dtv-scan-tables')
|
|
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30042), xbmcgui.NOTIFICATION_INFO)
|
|
urllib.request.urlretrieve(url, archive)
|
|
zip = zipfile.ZipFile(archive)
|
|
if zip.testzip() is not None: raise zipfile.BadZipfile
|
|
|
|
if os.path.exists(temp_folder): shutil.rmtree(temp_folder)
|
|
if os.path.exists(dest_folder): shutil.rmtree(dest_folder)
|
|
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30043), xbmcgui.NOTIFICATION_INFO)
|
|
for idx, folder in enumerate(SCANTABLES):
|
|
for z in zip.filelist:
|
|
if folder in z.filename: zip.extract(z.filename, temp)
|
|
|
|
for folder in SCANTABLES:
|
|
shutil.copytree(os.path.join(temp_folder, folder), os.path.join(dest_folder, folder))
|
|
if os.path.exists(temp_folder): shutil.rmtree(temp_folder)
|
|
os.remove(archive)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30039), xbmcgui.NOTIFICATION_INFO)
|
|
except URLError as e:
|
|
xbmc.log('Could not download file: %s' % e.reason, xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30040), xbmcgui.NOTIFICATION_ERROR)
|
|
except zipfile.BadZipfile:
|
|
xbmc.log('Could not extract files from zip, bad zipfile', xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30041), xbmcgui.NOTIFICATION_ERROR)
|
|
|
|
def updateNextPVR(self):
|
|
try:
|
|
dest_folder = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('path')), 'nextpvr-bin')
|
|
archive = os.path.join(temp, 'NPVR.zip')
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30011), xbmcgui.NOTIFICATION_INFO)
|
|
urllib.request.urlretrieve(GENERIC_URL, archive)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30012), xbmcgui.NOTIFICATION_INFO)
|
|
zip = zipfile.ZipFile(archive)
|
|
if zip.testzip() is not None: raise zipfile.BadZipfile
|
|
zip.close()
|
|
command = 'unzip -o {0} -d {1} > /dev/null'.format(archive, dest_folder)
|
|
xbmc.log('Running: %s' % command, xbmc.LOGDEBUG)
|
|
os.system(command)
|
|
os.remove(archive)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30039), xbmcgui.NOTIFICATION_INFO)
|
|
xbmc.log('NPVR.zip installed', xbmc.LOGDEBUG)
|
|
if xbmcgui.Dialog().yesno("NextPVR Server", LS(30020)):
|
|
self.id = xbmcaddon.Addon().getAddonInfo('id')
|
|
subprocess.call(['systemctl', 'restart', self.id])
|
|
|
|
except URLError as e:
|
|
xbmc.log('Could not download file: %s' % e.reason, xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30040), xbmcgui.NOTIFICATION_ERROR)
|
|
except zipfile.BadZipfile:
|
|
xbmc.log('Could not extract files from zip, bad zipfile', xbmc.LOGERROR)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, LS(30041), xbmcgui.NOTIFICATION_ERROR)
|
|
|
|
def sessionLogin(self):
|
|
self.session = requests.session()
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0'
|
|
}
|
|
response = self.session.get(self.url, headers=headers)
|
|
parsed = urlparse.urlparse(response.url)
|
|
salt = parse_qs(parsed.query)['salt'][0]
|
|
if self.hashedPassword == None:
|
|
passwordHash = self.hashMe(self.password)
|
|
else:
|
|
passwordHash = self.hashedPassword
|
|
combined = self.hashMe(salt + ':' + self.username + ':' + passwordHash)
|
|
response = self.session.get(self.url + 'login.html?hash='+combined)
|
|
if response.status_code != 200 and response.status_code != 302 :
|
|
print(response.text, response.status_code)
|
|
sys.exit()
|
|
for cookie in self.session.cookies:
|
|
self.session.cookies[cookie.name] = cookie.value
|
|
|
|
def doSessionRequest5(self, method, isJSON = True):
|
|
xbmc.log(method, xbmc.LOGDEBUG)
|
|
retval = False
|
|
getResult = None
|
|
url = self.url + 'service?method=' + method
|
|
try:
|
|
request = self.session.get(url, headers={"Accept" : "application/json"})
|
|
getResult = json.loads(request.text)
|
|
if request.status_code == 200 :
|
|
if 'stat' in getResult:
|
|
retval = getResult['stat'] == 'ok'
|
|
else:
|
|
retval = True
|
|
else:
|
|
xbmc.log(getResult, xbmc.LOGERROR)
|
|
|
|
except Exception as e:
|
|
xbmc.log(str(e), xbmc.LOGERROR)
|
|
|
|
return retval, getResult
|
|
|
|
def hashMe (self, thedata):
|
|
import hashlib
|
|
h = hashlib.md5()
|
|
h.update(thedata.encode('utf-8'))
|
|
return h.hexdigest()
|
|
|
|
def loginNextPVR(self):
|
|
base = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('profile')), 'config/config.xml')
|
|
tree = ET.parse(base)
|
|
root = tree.getroot()
|
|
child = root.find("WebServer")
|
|
self.port = child.find('Port').text
|
|
self.username = child.find('Username').text
|
|
self.hashedPIN = child.find('PinMD5').text
|
|
self.hashedPassword = child.find('Password').text.lower()
|
|
self.ip = '127.0.0.1'
|
|
self.url = 'http://{}:{}/'.format(self.ip, self.port)
|
|
self.sessionLogin()
|
|
|
|
|
|
def showMessage(self, message):
|
|
xbmc.log(message, xbmc.LOGDEBUG)
|
|
xbmcgui.Dialog().notification(ADDON_NAME, message, xbmcgui.NOTIFICATION_INFO)
|
|
|
|
|
|
def updateEpg(self):
|
|
self.loginNextPVR()
|
|
self.doSessionRequest5('system.epg.update')
|
|
self.doSessionRequest5('session.logout')
|
|
self.showMessage(LS(30015))
|
|
|
|
def updateM3u(self):
|
|
self.loginNextPVR()
|
|
self.doSessionRequest5('setting.m3u.update')
|
|
self.doSessionRequest5('session.logout')
|
|
self.showMessage(LS(30016))
|
|
|
|
def rescanDevices(self):
|
|
self.loginNextPVR()
|
|
self.doSessionRequest5('setting.devices&refresh=true')
|
|
self.doSessionRequest5('session.logout')
|
|
self.showMessage(LS(30017))
|
|
|
|
def transcodeHLS(self):
|
|
base = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('profile')), 'config/config.xml')
|
|
tree = ET.parse(base)
|
|
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True))
|
|
tree = ET.parse(base, parser=parser)
|
|
root = tree.getroot()
|
|
parent = root.find("WebServer")
|
|
child = parent.find('TranscodeHLS')
|
|
if child.text == 'default':
|
|
child.text = '-y [ANALYZE_DURATION] [SEEK] -i [SOURCE] -map_metadata -1 -threads [THREADS] -ignore_unknown -map 0:v:0? [PREFERRED_LANGUAGE] -map 0:a:[AUDIO_STREAM] -map -0:s -vcodec copy -acodec aac -ac 2 -c:s copy -hls_time [SEGMENT_DURATION] -start_number 0 -hls_list_size [SEGMENT_COUNT] -y [TARGET]'
|
|
else:
|
|
child.text = 'default'
|
|
tree.write(base, encoding='utf-8')
|
|
|
|
if child.text == 'default':
|
|
self.showMessage(LS(30018))
|
|
else:
|
|
self.showMessage(LS(30019))
|
|
|
|
def resetWebCredentials(self):
|
|
rewrite = False
|
|
base = os.path.join(xbmcvfs.translatePath(xbmcaddon.Addon().getAddonInfo('profile')), 'config/config.xml')
|
|
tree = ET.parse(base)
|
|
parser = ET.XMLParser(target=ET.TreeBuilder(insert_comments=True))
|
|
tree = ET.parse(base, parser=parser)
|
|
root = tree.getroot()
|
|
parent = root.find("WebServer")
|
|
child = parent.find('Username')
|
|
if child.text != 'admin':
|
|
child.text = 'admin'
|
|
rewrite = True
|
|
child = parent.find('Password')
|
|
if child.text != '5f4dcc3b5aa765d61d8327deb882cf99':
|
|
child.text = '5f4dcc3b5aa765d61d8327deb882cf99'
|
|
rewrite = True
|
|
if rewrite:
|
|
tree.write(base, encoding='utf-8')
|
|
self.showMessage(LS(30046))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
option = Controller()
|
|
try:
|
|
if sys.argv[1] == 'getscantables':
|
|
option.downloadScanTable()
|
|
elif sys.argv[1] == 'updategeneric':
|
|
option.updateNextPVR()
|
|
elif sys.argv[1] == 'updateepg':
|
|
option.updateEpg()
|
|
elif sys.argv[1] == 'transcode':
|
|
option.transcodeHLS()
|
|
elif sys.argv[1] == 'updatem3u':
|
|
option.updateM3u()
|
|
elif sys.argv[1] == 'rescan':
|
|
option.rescanDevices()
|
|
elif sys.argv[1] == 'defaults':
|
|
option.resetWebCredentials()
|
|
except IndexError:
|
|
pass
|