1 | #The name of the main class must match the file name in lowercase |
---|
2 | import os |
---|
3 | import urllib |
---|
4 | import shutil |
---|
5 | import gi |
---|
6 | from gi.repository import Gio |
---|
7 | gi.require_version ('Snapd', '1') |
---|
8 | from gi.repository import Snapd |
---|
9 | gi.require_version('AppStreamGlib', '1.0') |
---|
10 | from gi.repository import AppStreamGlib as appstream |
---|
11 | import time |
---|
12 | #Needed for async find method, perhaps only on xenial |
---|
13 | wrap=Gio.SimpleAsyncResult() |
---|
14 | class snapmanager: |
---|
15 | |
---|
16 | def __init__(self): |
---|
17 | self.dbg=False |
---|
18 | self.progress=0 |
---|
19 | self.partial_progress=0 |
---|
20 | self.plugin_actions={'install':'snap','remove':'snap','pkginfo':'snap','load':'snap'} |
---|
21 | self.cache_dir=os.getenv("HOME")+"/.cache/lliurex-store" |
---|
22 | self.icons_folder=self.cache_dir+"/icons" |
---|
23 | self.images_folder=self.cache_dir+"/images" |
---|
24 | self.result={} |
---|
25 | self.result['data']={} |
---|
26 | self.result['status']={} |
---|
27 | self.disabled=False |
---|
28 | self.icon_cache_enabled=True |
---|
29 | self.image_cache_enabled=True |
---|
30 | self.cli_mode=False |
---|
31 | if not os.path.isdir(self.icons_folder): |
---|
32 | try: |
---|
33 | os.makedirs(self.icons_folder) |
---|
34 | except: |
---|
35 | self.icon_cache_enabled=False |
---|
36 | self._debug("Icon cache disabled") |
---|
37 | if not os.path.isdir(self.images_folder): |
---|
38 | try: |
---|
39 | os.makedirs(self.images_folder) |
---|
40 | except: |
---|
41 | self.image_cache_enabled=False |
---|
42 | self._debug("Image cache disabled") |
---|
43 | #def __init__ |
---|
44 | |
---|
45 | def set_debug(self,dbg=True): |
---|
46 | self.dbg=dbg |
---|
47 | self._debug ("Debug enabled") |
---|
48 | #def set_debug |
---|
49 | |
---|
50 | def _debug(self,msg=''): |
---|
51 | if self.dbg: |
---|
52 | print ('DEBUG snap: %s'%msg) |
---|
53 | #def debug |
---|
54 | |
---|
55 | def register(self): |
---|
56 | return(self.plugin_actions) |
---|
57 | |
---|
58 | def enable(self,state=False): |
---|
59 | self.disabled=state |
---|
60 | |
---|
61 | def execute_action(self,action,applist=None,store=None,results=0): |
---|
62 | if store: |
---|
63 | self.store=store |
---|
64 | else: |
---|
65 | self.store=appstream.Store() |
---|
66 | self.progress=0 |
---|
67 | self.result['status']={'status':-1,'msg':''} |
---|
68 | self.result['data']='' |
---|
69 | |
---|
70 | |
---|
71 | if self.disabled==True: |
---|
72 | self._set_status(9) |
---|
73 | self.result['data']=self.store |
---|
74 | else: |
---|
75 | self.snap_client=Snapd.Client() |
---|
76 | self.snap_client.connect_sync(None) |
---|
77 | dataList=[] |
---|
78 | if action=='load': |
---|
79 | self.result['data']=self._load_snap_store(self.store) |
---|
80 | else: |
---|
81 | for app_info in applist: |
---|
82 | self.partial_progress=0 |
---|
83 | if action=='install': |
---|
84 | dataList.append(self._install_snap(app_info)) |
---|
85 | if action=='remove': |
---|
86 | dataList.append(self._remove_snap(app_info)) |
---|
87 | if action=='pkginfo': |
---|
88 | dataList.append(self._get_info(app_info)) |
---|
89 | self.progress+=round(self.partial_progress/len(applist),1) |
---|
90 | if self.progress>98: |
---|
91 | self.progress=98 |
---|
92 | self.result['data']=list(dataList) |
---|
93 | self.progress=100 |
---|
94 | return(self.result) |
---|
95 | |
---|
96 | def _set_status(self,status,msg=''): |
---|
97 | self.result['status']={'status':status,'msg':msg} |
---|
98 | #def _set_status |
---|
99 | |
---|
100 | def _callback(self,client,change, _,user_data): |
---|
101 | # Interate over tasks to determine the aggregate tasks for completion. |
---|
102 | total = 0 |
---|
103 | done = 0 |
---|
104 | for task in change.get_tasks(): |
---|
105 | total += task.get_progress_total() |
---|
106 | done += task.get_progress_done() |
---|
107 | self.progress = round((done/total)*100) |
---|
108 | #def _callback |
---|
109 | |
---|
110 | def _load_snap_store(self,store): |
---|
111 | pkgs=[] |
---|
112 | if self.cli_mode: |
---|
113 | pkgs=self._search_snap("*") |
---|
114 | else: |
---|
115 | pkgs=self._search_snap_async("*") |
---|
116 | self._set_status(1) |
---|
117 | for pkg in pkgs: |
---|
118 | store.add_app(self._generate_appstream_app(pkg)) |
---|
119 | return(store) |
---|
120 | |
---|
121 | def _generate_appstream_app(self,pkg): |
---|
122 | bundle=appstream.Bundle() |
---|
123 | app=appstream.App() |
---|
124 | icon=appstream.Icon() |
---|
125 | screenshot=appstream.Screenshot() |
---|
126 | #F*****g appstream have kinds undefined but kind_from_string works... wtf? |
---|
127 | # bundle.set_kind(appstream.BundleKind.SNAP) |
---|
128 | bundle.set_kind(bundle.kind_from_string('SNAP')) |
---|
129 | bundle.set_id(pkg.get_name()+'.snap') |
---|
130 | app.add_bundle(bundle) |
---|
131 | app.set_name("C",pkg.get_name()) |
---|
132 | app.add_pkgname(pkg.get_name()+'.snap') |
---|
133 | app.add_category("Snap") |
---|
134 | release=appstream.Release() |
---|
135 | release.set_version(pkg.get_version()) |
---|
136 | app.add_release(release) |
---|
137 | app.set_id("io.snapcraft.%s"%pkg.get_name()+'.snap') |
---|
138 | # app.set_id(pkg.get_name()+'.snap') |
---|
139 | app.set_id_kind=appstream.IdKind.DESKTOP |
---|
140 | description=("This is an Snap bundle of app %s. It hasn't been tested by our developers and comes from a 3rd party dev team. Please use it carefully."%pkg.get_name()) |
---|
141 | app.set_description("C",description+'<br>'+pkg.get_description()) |
---|
142 | app.set_comment("C",pkg.get_summary()) |
---|
143 | app.add_keyword("C",pkg.get_name()) |
---|
144 | for word in pkg.get_summary().split(' '): |
---|
145 | app.add_keyword("C",word) |
---|
146 | |
---|
147 | if pkg.get_icon(): |
---|
148 | if self.icon_cache_enabled: |
---|
149 | icon.set_kind(appstream.IconKind.LOCAL) |
---|
150 | icon.set_name(self._download_file(pkg.get_icon(),pkg.get_name(),self.icons_folder)) |
---|
151 | else: |
---|
152 | icon.set_kind(appstream.IconKind.REMOTE) |
---|
153 | icon.set_name(pkg.get_icon()) |
---|
154 | app.add_icon(icon) |
---|
155 | |
---|
156 | if pkg.get_license(): |
---|
157 | app.set_project_license(pkg.get_license()) |
---|
158 | |
---|
159 | if pkg.get_screenshots(): |
---|
160 | for snap_screen in pkg.get_screenshots(): |
---|
161 | img=appstream.Image() |
---|
162 | # img.load_filename(self._download_file(snap_screen.get_url(),pkg.get_name(),self.images_folder)) |
---|
163 | img.set_kind(appstream.ImageKind.SOURCE) |
---|
164 | img.set_url(snap_screen.get_url()) |
---|
165 | break |
---|
166 | screenshot.add_image(img) |
---|
167 | app.add_screenshot(screenshot) |
---|
168 | return(app) |
---|
169 | |
---|
170 | def _search_cb(self,obj,request,*args): |
---|
171 | global wrap |
---|
172 | wrap=request |
---|
173 | |
---|
174 | def _search_snap_async(self,tokens): |
---|
175 | self._debug("Async Searching %s"%tokens) |
---|
176 | pkgs=None |
---|
177 | global wrap |
---|
178 | self.snap_client.find_async(Snapd.FindFlags.MATCH_NAME, |
---|
179 | tokens, |
---|
180 | None, |
---|
181 | self._search_cb,(None,),None) |
---|
182 | while 'Snapd' not in str(type(wrap)): |
---|
183 | time.sleep(0.1) |
---|
184 | snaps=self.snap_client.find_finish(wrap) |
---|
185 | if type(snaps)!=type([]): |
---|
186 | pkgs=[snaps] |
---|
187 | else: |
---|
188 | pkgs=snaps |
---|
189 | stable_pkgs=[] |
---|
190 | for pkg in pkgs: |
---|
191 | if pkg.get_channel()=='stable': |
---|
192 | stable_pkgs.append(pkg) |
---|
193 | return(stable_pkgs) |
---|
194 | |
---|
195 | def _search_snap(self,tokens): |
---|
196 | self._debug("Searching %s"%tokens) |
---|
197 | pkg=None |
---|
198 | pkgs=None |
---|
199 | try: |
---|
200 | pkgs=self.snap_client.find_sync(Snapd.FindFlags.MATCH_NAME,tokens,None,None) |
---|
201 | except Exception as e: |
---|
202 | print(e) |
---|
203 | self._set_status(1) |
---|
204 | stable_pkgs=[] |
---|
205 | for pkg in pkgs: |
---|
206 | if pkg.get_channel()=='stable': |
---|
207 | stable_pkgs.append(pkg) |
---|
208 | else: |
---|
209 | self._debug(pkg.get_channel()) |
---|
210 | self._debug("Done") |
---|
211 | return(stable_pkgs) |
---|
212 | #def _search_snap |
---|
213 | |
---|
214 | def _download_file(self,url,app_name,dest_dir): |
---|
215 | # target_file=self.icons_folder+'/'+app_name+".png" |
---|
216 | target_file=dest_dir+'/'+app_name+".png" |
---|
217 | if not os.path.isfile(target_file): |
---|
218 | # shutil.copy("/usr/share/icons/hicolor/128x128/apps/lliurex-store.png",target_file) |
---|
219 | # if not os.fork(): |
---|
220 | if not os.path.isfile(target_file): |
---|
221 | self._debug("Downloading %s to %s"%(url,target_file)) |
---|
222 | try: |
---|
223 | with urllib.request.urlopen(url) as response, open(target_file, 'wb') as out_file: |
---|
224 | bf=16*1024 |
---|
225 | acumbf=0 |
---|
226 | file_size=int(response.info()['Content-Length']) |
---|
227 | while True: |
---|
228 | if acumbf>=file_size: |
---|
229 | break |
---|
230 | shutil.copyfileobj(response, out_file,bf) |
---|
231 | acumbf=acumbf+bf |
---|
232 | st = os.stat(target_file) |
---|
233 | except Exception as e: |
---|
234 | self._debug("Unable to download %s"%url) |
---|
235 | self._debug("Reason: %s"%e) |
---|
236 | # os._exit(0) |
---|
237 | return(target_file) |
---|
238 | #def _download_file |
---|
239 | |
---|
240 | |
---|
241 | def _get_info(self,app_info): |
---|
242 | #switch to launch async method when running under a gui |
---|
243 | #For an unknown reason request will block when sync mode under a gui and async blocks when on cli (really funny) |
---|
244 | self._debug("Getting info for %s"%app_info) |
---|
245 | pkg=None |
---|
246 | try: |
---|
247 | pkg=self.snap_client.list_one_sync(app_info['name']) |
---|
248 | app_info['state']='installed' |
---|
249 | pkgs=[pkg] |
---|
250 | except: |
---|
251 | app_info['state']='available' |
---|
252 | if self.cli_mode: |
---|
253 | pkgs=self._search_snap(app_info['name']) |
---|
254 | else: |
---|
255 | pkgs=self._search_snap_async(app_info['name']) |
---|
256 | self._debug("Getting extended info for %s %s"%(app_info['name'],pkgs)) |
---|
257 | if type(pkgs)==type([]): |
---|
258 | for pkg in pkgs: |
---|
259 | self._debug("Getting extended info for %s"%app_info['name']) |
---|
260 | if pkg.get_download_size(): |
---|
261 | app_info['size']=str(pkg.get_download_size()) |
---|
262 | else: |
---|
263 | app_info['size']=str(pkg.get_installed_size()) |
---|
264 | break |
---|
265 | else: |
---|
266 | app_info['size']='0' |
---|
267 | self._debug("Info for %s"%app_info) |
---|
268 | self.partial_progress=100 |
---|
269 | return(app_info) |
---|
270 | #def _get_info |
---|
271 | |
---|
272 | def _install_snap(self,app_info): |
---|
273 | self._debug("Installing %s"%app_info['name']) |
---|
274 | if app_info['state']=='installed': |
---|
275 | self._set_status(4) |
---|
276 | else: |
---|
277 | try: |
---|
278 | self.snap_client.install_sync(app_info['name'], |
---|
279 | None, # channel |
---|
280 | self._callback, (None,), |
---|
281 | None) # cancellable |
---|
282 | app_info['state']='installed' |
---|
283 | self._set_status(0) |
---|
284 | except Exception as e: |
---|
285 | print("Install error %s"%e) |
---|
286 | self._set_status(5) |
---|
287 | |
---|
288 | self._debug("Installed %s"%app_info) |
---|
289 | return app_info |
---|
290 | #def _install_snap |
---|
291 | |
---|
292 | def _remove_snap(self,app_info): |
---|
293 | if app_info['state']=='available': |
---|
294 | self._set_status(3) |
---|
295 | else: |
---|
296 | try: |
---|
297 | self.snap_client.remove_sync(app_info['name'], |
---|
298 | self._callback, (None,), |
---|
299 | None) # cancellable |
---|
300 | app_info['state']='available' |
---|
301 | self._set_status(0) |
---|
302 | except Exception as e: |
---|
303 | print("Remove error %s"%e) |
---|
304 | self._set_status(6) |
---|
305 | return app_info |
---|
306 | #def _remove_snap |
---|