再次完善python;

解决直播地址配置时未及时生效的bug;
(以后更新随缘)
pull/144/head
于俊 1 year ago
parent 51621d80fb
commit d80c5102c3
  1. 4
      app/src/main/java/com/github/tvbox/osc/ui/activity/SettingActivity.java
  2. 2
      pyramid/src/python/app.py
  3. 6
      pyramid/src/python/base/localProxy.py
  4. 191
      pyramid/src/python/base/spider.py

@ -184,8 +184,10 @@ public class SettingActivity extends BaseActivity {
AppManager.getInstance().finishAllActivity(); AppManager.getInstance().finishAllActivity();
jumpActivity(HomeActivity.class, createBundle()); jumpActivity(HomeActivity.class, createBundle());
} }
else if ((homeSourceKey != null && !homeSourceKey.equals(Hawk.get(HawkConfig.HOME_API, ""))) || homeRec != Hawk.get(HawkConfig.HOME_REC, 0) || !currentLiveApi.equals(Hawk.get(HawkConfig.LIVE_API_URL, ""))) { else if ((homeSourceKey != null && !homeSourceKey.equals(Hawk.get(HawkConfig.HOME_API, ""))) || homeRec != Hawk.get(HawkConfig.HOME_REC, 0)) {
jumpActivity(HomeActivity.class, createBundle()); jumpActivity(HomeActivity.class, createBundle());
}else if(!currentLiveApi.equals(Hawk.get(HawkConfig.LIVE_API_URL, ""))){
jumpActivity(HomeActivity.class);
} }
} else { } else {
AppManager.getInstance().finishAllActivity(); AppManager.getInstance().finishAllActivity();

@ -84,7 +84,7 @@ def init(ru,extend):
if sp != None: if sp != None:
sp.setExtendInfo(sParam[key]) sp.setExtendInfo(sParam[key])
spoList.append(sp) spoList.append(sp)
ru.setExtendInfo(extend) # ru.setExtendInfo(extend)
ru.init(extend) ru.init(extend)
def homeContent(ru,filter): def homeContent(ru,filter):

@ -0,0 +1,6 @@
class Proxy:
def getUrl(self, local):
return 'http://127.0.0.1:9978'
def getPort(self):
return 9978

@ -1,86 +1,151 @@
#coding=utf-8
#!/usr/bin/python
import re import re
import os
import json import json
import time
import requests import requests
from lxml import etree from lxml import etree
from abc import abstractmethod,ABCMeta from abc import abstractmethod, ABCMeta
from importlib.machinery import SourceFileLoader from importlib.machinery import SourceFileLoader
class Spider(metaclass=ABCMeta): # 元类 默认的元类 type from base.localProxy import Proxy
class Spider(metaclass=ABCMeta):
_instance = None _instance = None
def __init__(self):
self.extend = ''
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
if cls._instance: if cls._instance:
return cls._instance # 有实例则直接返回 return cls._instance
else: else:
cls._instance = super().__new__(cls) # 没有实例则new一个并保存 cls._instance = super().__new__(cls)
return cls._instance # 这个返回是给是给init,再实例化一次,也没有关系 return cls._instance
# # 这是简化的写法,上面注释的写法更容易提现判断思路
# if not cls._instance:
# cls._instance = super().__new__(cls)
# return cls._instance
@abstractmethod
def init(self,extend=""):pass
@abstractmethod
def homeContent(self,filter):pass
@abstractmethod
def homeVideoContent(self):pass
@abstractmethod
def categoryContent(self,tid,pg,filter,extend):pass
@abstractmethod
def detailContent(self,ids):pass
@abstractmethod
def searchContent(self,key,quick):pass
@abstractmethod @abstractmethod
def playerContent(self,flag,id,vipFlags):pass def init(self, extend=""):
# @abstractmethod pass
def homeContent(self, filter):
pass
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
pass
def detailContent(self, ids):
pass
def searchContent(self, key, quick, pg="1"):
pass
def playerContent(self, flag, id, vipFlags):
pass
def liveContent(self, url): def liveContent(self, url):
pass pass
@abstractmethod
def localProxy(self,param):pass def localProxy(self, param):
@abstractmethod pass
def isVideoFormat(self,url):pass
@abstractmethod def isVideoFormat(self, url):
def manualVideoCheck(self):pass pass
@abstractmethod
def getName(self):pass def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def getName(self):
pass
def getDependence(self): def getDependence(self):
return [] return []
def setExtendInfo(self,extend):
self.extend = extend def loadSpider(self, name):
def regStr(self,src,reg,group=1): return self.loadModule(name).Spider()
def loadModule(self, name):
path = os.path.join(os.path.join("../plugin"), f'{name}.py')
return SourceFileLoader(name, path).load_module()
def regStr(self, reg, src, group=1):
m = re.search(reg, src) m = re.search(reg, src)
src = '' src = ''
if m : if m:
src = m.group(group) src = m.group(group)
return src return src
def str2json(self,str):
return json.loads(str) def removeHtmlTags(self, src):
# cGroup = re.compile('[\U00010000-\U0010ffff]') clean = re.compile('<.*?>')
# clean = cGroup.sub('',rsp.text) return re.sub(clean, '', src)
def cleanText(self,src):
clean = re.sub('[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF]', '', src) def cleanText(self, src):
clean = re.sub('[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF]', '',
src)
return clean return clean
def fetch(self,url,headers={},cookies=""):
rsp = requests.get(url,headers=headers,cookies=cookies) def fetch(self, url, params=None, cookies=None, headers=None, timeout=5, verify=True, stream=False,
rsp.encoding='utf-8' allow_redirects=True):
return rsp rsp = requests.get(url, params=params, cookies=cookies, headers=headers, timeout=timeout, verify=verify,
def post(self,url,data,headers={},cookies={}): stream=stream, allow_redirects=allow_redirects)
rsp = requests.post(url,data=data,headers=headers,cookies=cookies) rsp.encoding = 'utf-8'
rsp.encoding='utf-8'
return rsp return rsp
def postJson(self,url,json,headers={},cookies={}):
rsp = requests.post(url,json=json,headers=headers,cookies=cookies) def post(self, url, params=None, data=None, json=None, cookies=None, headers=None, timeout=5, verify=True,
rsp.encoding='utf-8' stream=False, allow_redirects=True):
rsp = requests.post(url, params=params, data=data, json=json, cookies=cookies, headers=headers, timeout=timeout,
verify=verify, stream=stream, allow_redirects=allow_redirects)
rsp.encoding = 'utf-8'
return rsp return rsp
def html(self,content):
def html(self, content):
return etree.HTML(content) return etree.HTML(content)
def xpText(self,root,expr):
ele = root.xpath(expr) def str2json(str):
if len(ele) == 0: return json.loads(str)
return ''
def json2str(str):
return json.dumps(str, ensure_ascii=False)
def getProxyUrl(self, local=True):
return f'{Proxy.getUrl(local)}?do=py'
def log(self, msg):
if isinstance(msg, dict) or isinstance(msg, list):
print(json.dumps(msg, ensure_ascii=False))
else: else:
return ele[0] print(f'{msg}')
def loadModule(self,name,fileName):
return SourceFileLoader(name, fileName).load_module() def getCache(self, key):
value = self.fetch(f'http://127.0.0.1:{Proxy.getPort()}/cache?do=get&key={key}', timeout=5).text
if len(value) > 0:
if value.startswith('{') and value.endswith('}') or value.startswith('[') and value.endswith(']'):
value = json.loads(value)
if type(value) == dict:
if not 'expiresAt' in value or value['expiresAt'] >= int(time.time()):
return value
else:
self.delCache(key)
return None
return value
else:
return None
def setCache(self, key, value):
if type(value) in [int, float]:
value = str(value)
if len(value) > 0:
if type(value) == dict or type(value) == list:
value = json.dumps(value, ensure_ascii=False)
r = self.post(f'http://127.0.0.1:{Proxy.getPort()}/cache?do=set&key={key}', data={"value": value}, timeout=5)
return 'succeed' if r.status_code == 200 else 'failed'
def delCache(self, key):
r = self.fetch(f'http://127.0.0.1:{Proxy.getPort()}/cache?do=del&key={key}', timeout=5)
return 'succeed' if r.status_code == 200 else 'failed'
Loading…
Cancel
Save