-
Notifications
You must be signed in to change notification settings - Fork 96
/
check_source.py
executable file
·800 lines (668 loc) · 36.9 KB
/
check_source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
#!/usr/bin/python3
import difflib
import glob
import os
import re
import shutil
import subprocess
import sys
import tempfile
from typing import Optional, Set
from cmdln import CmdlnOptionParser
from lxml import etree as ET
import osc.conf
import osc.core
from osclib.conf import Config
from osclib.core import devel_project_get
from osclib.core import devel_project_fallback
from osclib.core import entity_exists
from osclib.core import group_members
from osclib.core import package_kind
from osclib.core import create_add_role_request
from osclib.core import package_role_expand
from osclib.core import source_file_load
from osclib.core import project_pseudometa_package
from osc.core import show_package_meta, show_project_meta
from osc.core import get_request_list
from urllib.error import HTTPError
import ReviewBot
from osclib.conf import str2bool
class CheckSource(ReviewBot.ReviewBot):
SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__))
def __init__(self, *args, **kwargs):
ReviewBot.ReviewBot.__init__(self, *args, **kwargs)
# ReviewBot options.
self.request_default_return = True
self.skip_add_reviews = False
def target_project_config(self, project: str) -> None:
# Load project config and allow for remote entries.
config = Config.get(self.apiurl, project)
self.single_action_require = str2bool(config.get('check-source-single-action-require', 'False'))
self.ignore_devel: bool = not str2bool(config.get('devel-project-enforce', 'False'))
self.in_air_rename_allow = str2bool(config.get('check-source-in-air-rename-allow', 'False'))
self.add_review_team = str2bool(config.get('check-source-add-review-team', 'True'))
self.review_team = config.get('review-team')
self.mail_release_list = config.get('mail-release-list')
self.staging_group = config.get('staging-group')
self.required_maintainer = config.get('required-source-maintainer', '')
self.devel_whitelist = config.get('devel-whitelist', '').split()
self.skip_add_reviews = False
self.ensure_source_exist_in_baseproject = str2bool(config.get('check-source-ensure-source-exist-in-baseproject', 'False'))
self.devel_baseproject: str = config.get('check-source-devel-baseproject', '')
self.allow_source_in_sle = str2bool(config.get('check-source-allow-source-in-sle', 'True'))
self.sle_project_to_check = config.get('check-source-sle-project', '')
self.slfo_packagelist_to_check = config.get('check-source-slfo-packagelist-file', '')
self.allow_valid_source_origin = str2bool(config.get('check-source-allow-valid-source-origin', 'False'))
self.valid_source_origins: Set[str] = set(config.get('check-source-valid-source-origins', '').split(' '))
self.add_devel_project_review = str2bool(config.get('check-source-add-devel-project-review', 'False'))
self.allowed_scm_submission_sources = config.get('allowed-scm-submission-sources', '').split()
if self.action.type == 'maintenance_incident':
# The workflow effectively enforces the names to match and the
# parent code sets target_package from source_package so this check
# becomes useless and awkward to perform.
self.in_air_rename_allow = True
# The target project will be set to product and thus inherit
# settings, but override since real target is not product.
self.single_action_require = False
# It might make sense to supersede maintbot, but for now.
self.skip_add_reviews = True
def is_good_name(self, package: Optional[str], target_package: Optional[str]) -> bool:
self.logger.debug(f"is_good_name {package} <-> {target_package}")
if target_package is None:
# if the name doesn't matter, existance is all
return package is not None
return target_package == package
def package_source_parse(self, project, package, revision=None, target_package=None):
ret = self._package_source_parse(project, package, revision)
if self.is_good_name(ret['name'], target_package):
return ret
d = {}
for repo in osc.core.get_repositories_of_project(self.apiurl, project):
r = self._package_source_parse(project, package, revision, repo)
if r['name'] is not None:
d[r['name']] = r
if len(d) == 1:
# here is only one so use that
ret = d[next(iter(d))]
else:
# check if any name matches
self.logger.debug("found multiple names %s", ', '.join(d.keys()))
for n, r in d.items():
if n == target_package:
ret = r
break
if not self.is_good_name(ret['name'], target_package):
self.logger.error("none of the names matched")
return ret
def check_source_submission(
self,
source_project: str,
source_package: str,
source_revision: str,
target_project: str,
target_package: str
) -> bool:
super(CheckSource, self).check_source_submission(source_project,
source_package, source_revision, target_project, target_package)
self.target_project_config(target_project)
if self.single_action_require and len(self.request.actions) != 1:
self.review_messages['declined'] = 'Only one action per request allowed'
return False
if source_revision is None:
self.review_messages['declined'] = 'Submission not from a pinned source revision'
return False
kind = package_kind(self.apiurl, target_project, target_package)
if kind == 'meta':
self.review_messages['accepted'] = 'Skipping most checks for meta packages'
if not self.skip_add_reviews and self.add_review_team and self.review_team is not None:
if not (self.allow_valid_source_origin and source_project in self.valid_source_origins):
self.add_review(self.request, by_group=self.review_team, msg='Please review sources')
return True
elif (kind is not None and kind != 'source'):
self.review_messages['declined'] = f'May not modify a non-source package of type {kind}'
return False
if not self.allow_source_in_sle:
if self.sle_project_to_check and entity_exists(self.apiurl, self.sle_project_to_check, target_package):
self.review_messages['declined'] = ("SLE-base package, please submit to the corresponding SLE project."
"Or let us know the reason why needs to rebuild SLE-base package.")
return False
if self.slfo_packagelist_to_check:
pseudometa_project, pseudometa_package = project_pseudometa_package(self.apiurl, target_project)
if pseudometa_project and pseudometa_package:
metafile = ET.fromstring(source_file_load(self.apiurl, pseudometa_project, pseudometa_package,
self.slfo_packagelist_to_check))
slfo_pkglist = [package.attrib['name'] for package in metafile.findall('package')]
if target_package in slfo_pkglist:
self.review_messages['declined'] = ("Please create a new feature request "
f"https://code.opensuse.org/leap/features/issues for updating {target_package} "
"from SLFO (SLES, SL Micro). Alternatively, please provide "
"a reason for forking and rebuilding an existing SLFO package in Leap.")
return False
if self.ensure_source_exist_in_baseproject and self.devel_baseproject:
if not entity_exists(self.apiurl, self.devel_baseproject, target_package) and source_project not in self.valid_source_origins:
self.review_messages['declined'] = f"Per our development policy, please submit to {self.devel_baseproject} first."
return False
inair_renamed = target_package != source_package
if not self.ignore_devel:
self.logger.info('checking if target package exists and has devel project')
devel_project, devel_package = devel_project_get(self.apiurl, target_project, target_package)
if devel_project:
if (
(source_project != devel_project or source_package != devel_package)
and not (source_project == target_project and source_package == target_package)):
# check if the devel project & package are using scmsync & match the allowed prj prefix
# => waive the devel project source submission requirement
meta = ET.fromstringlist(show_package_meta(self.apiurl, devel_project, devel_package))
scm_sync = meta.find('scmsync')
if scm_sync is None:
# Not from proper devel project/package and not self-submission and not scmsync.
self.review_messages['declined'] = f'Expected submission from devel package {devel_project}/{devel_package}'
return False
scm_pool_repository = f"https://src.opensuse.org/pool/{source_package}"
if not scm_sync.text.startswith(scm_pool_repository):
# devel project uses scm sync not from the trusted src location
self.review_messages['declined'] = (
f"devel project scmsync setting is {scm_sync.text}. Must be {scm_pool_repository} instead.")
return False
if not self.source_is_scm_staging_submission(source_project):
# Not a submission coming from the scm-sync bot
self.review_messages['declined'] = "Expected a submitrequest coming from scm-sync project"
return False
else:
# Check to see if other packages exist with the same source project
# which indicates that the project has already been used as devel.
if not self.is_devel_project(source_project, target_project):
self.review_messages['declined'] = (
f'{source_project} is not a devel project of {target_project}, submit the package to a devel project first. '
'See https://en.opensuse.org/openSUSE:How_to_contribute_to_Factory#How_to_request_a_new_devel_project for details.'
)
return False
else:
if source_project.endswith(':Update'):
# Allow for submission like:
# - source: openSUSE:Leap:15.0:Update/google-compute-engine.8258
# - target: openSUSE:Leap:15.1/google-compute-engine
# Note: home:jberry:Update would also be allowed via this condition,
# but that should be handled by leaper and human review.
# Ignore a dot in package name (ex. tpm2.0-abrmd) and instead
# only look for ending in dot number.
match = re.match(r'(.*)\.\d+$', source_package)
if match:
inair_renamed = target_package != match.group(1)
# TODO(dmllr): ensure requird maintainers are set in the temporary project that is created
# by the scm-staging bot
if not self.source_is_scm_staging_submission(source_project) and not self.source_has_required_maintainers(source_project):
declined_msg = (
f'This request cannot be accepted unless {self.required_maintainer} is a maintainer of {source_project}.'
)
req = self.__ensure_add_role_request(source_project)
if req:
declined_msg += f' Created the add_role request {req} for addressing this problem.'
self.review_messages['declined'] = declined_msg
return False
if not self.in_air_rename_allow and inair_renamed:
self.review_messages['declined'] = 'Source and target package names must match'
return False
# Checkout and see if renaming package screws up version parsing.
copath = os.path.expanduser(f'~/co/{self.request.reqid}')
if os.path.exists(copath):
self.logger.warning(f'directory {copath} already exists')
shutil.rmtree(copath)
os.makedirs(copath)
os.chdir(copath)
try:
CheckSource.checkout_package(self.apiurl, target_project, target_package, pathname=copath,
server_service_files=True, expand_link=True)
shutil.rmtree(os.path.join(target_package, '.osc'))
os.rename(target_package, '_old')
except HTTPError as e:
if e.code == 404:
self.logger.info(f'target package does not exist {target_project}/{target_package}')
else:
raise e
CheckSource.checkout_package(self.apiurl, source_project, source_package, revision=source_revision,
pathname=copath, server_service_files=True, expand_link=True)
os.rename(source_package, target_package)
shutil.rmtree(os.path.join(target_package, '.osc'))
new_info = self.package_source_parse(source_project, source_package, source_revision, target_package)
filename = new_info.get('filename', '')
expected_name = target_package
if filename == '_preinstallimage':
expected_name = 'preinstallimage'
if not (filename.endswith('.kiwi') or filename == 'Dockerfile') and new_info['name'] != expected_name:
shutil.rmtree(copath)
self.review_messages['declined'] = (
f"A package submitted as {target_package} has to build as 'Name: {expected_name}' - found Name '{new_info['name']}'")
return False
if not self.check_service_file(target_package):
return False
if not self.check_rpmlint(target_package):
return False
specs = [os.path.basename(x) for x in glob.glob(os.path.join(target_package, "*.spec"))]
if specs and not self.check_spec_policy('_old', target_package, specs):
return False
if not self.run_source_validator('_old', target_package):
return False
if specs and not self.detect_mentioned_patches('_old', target_package, specs):
return False
if not self.check_urls('_old', target_package, specs):
osc.core.change_review_state(apiurl=self.apiurl,
reqid=self.request.reqid, newstate='new',
by_group=self.review_group,
by_user=self.review_user, message=self.review_messages['new'])
return None
shutil.rmtree(copath)
self.review_messages['accepted'] = 'Check script succeeded'
if self.skip_add_reviews:
return True
if self.add_review_team and self.review_team is not None:
if not (self.allow_valid_source_origin and source_project in self.valid_source_origins):
self.add_review(self.request, by_group=self.review_team, msg='Please review sources')
if self.add_devel_project_review:
devel_project, devel_package = devel_project_fallback(self.apiurl, target_project, target_package)
if devel_project and devel_package:
submitter = self.request.creator
maintainers = set(package_role_expand(self.apiurl, devel_project, devel_package))
known_maintainer = False
if maintainers:
if submitter in maintainers:
self.logger.debug(f"{submitter} is maintainer")
known_maintainer = True
if not known_maintainer:
for r in self.request.reviews:
if r.by_user in maintainers:
self.logger.debug(f"found {r.by_user} as reviewer")
known_maintainer = True
if not known_maintainer:
self.logger.warning(f"submitter: {submitter}, maintainers: {','.join(maintainers)} => need review")
self.logger.debug(f"adding review to {devel_project}/{devel_package}")
msg = ('Submission for {} by someone who is not maintainer in '
'the devel project ({}). Please review').format(target_package, devel_project)
self.add_review(self.request, by_project=devel_project, by_package=devel_package, msg=msg)
else:
self.logger.warning(f"{target_package} doesn't have devel project")
if self.only_changes():
self.logger.debug('only .changes modifications')
if self.staging_group and self.review_user in group_members(self.apiurl, self.staging_group):
if not self.dryrun:
osc.core.change_review_state(self.apiurl, str(self.request.reqid), 'accepted',
by_group=self.staging_group,
message='skipping the staging process since only .changes modifications')
else:
self.logger.debug('unable to skip staging review since not a member of staging group')
return True
def is_devel_project(self, source_project, target_project):
if source_project in self.devel_whitelist:
return True
# Allow any projects already used as devel projects for other packages.
search = {
'package': f"@project='{target_project}' and devel/@project='{source_project}'",
}
result = osc.core.search(self.apiurl, **search)
return result['package'].attrib['matches'] != '0'
def check_service_file(self, directory):
ALLOWED_MODES = ['localonly', 'disabled', 'buildtime', 'manual']
servicefile = os.path.join(directory, '_service')
if os.path.exists(servicefile):
services = ET.parse(servicefile)
for service in services.findall('service'):
mode = service.get('mode')
if mode in ALLOWED_MODES:
continue
allowed = ', '.join(ALLOWED_MODES)
name = service.get('name')
self.review_messages[
'declined'] = f"Services are only allowed if their mode is one of {allowed}. " + \
f"Please change the mode of {name} and use `osc service localrun/disabledrun`."
return False
# remove it away to have full service from source validator
os.unlink(servicefile)
for file in glob.glob(os.path.join(directory, "_service:*")):
file = os.path.basename(file)
self.review_messages['declined'] = f"Found _service generated file {file} in checkout. Please clean this up first."
return False
return True
def check_rpmlint(self, directory):
for rpmlintrc in glob.glob(os.path.join(directory, "*rpmlintrc")):
with open(rpmlintrc, 'r') as f:
for line in f:
if not re.match(r'^\s*setBadness', line):
continue
self.review_messages['declined'] = f"For product submissions, you cannot use setBadness. Use filters in {rpmlintrc}."
return False
return True
def check_spec_policy(self, old, directory, specs):
bname = os.path.basename(directory)
if not os.path.exists(os.path.join(directory, bname + '.changes')):
text = f"{bname}.changes is missing. "
text += "A package submitted as FooBar needs to have a FooBar.changes file with a format created by `osc vc`."
self.review_messages['declined'] = text
return False
specfile = os.path.join(directory, bname + '.spec')
if not os.path.exists(specfile):
self.review_messages['declined'] = f"{bname}.spec is missing. A package submitted as FooBar needs to have a FooBar.spec file."
return False
changes_updated = False
for spec in specs:
with open(os.path.join(directory, spec), 'r') as f:
content = f.read()
if not re.search(r'#[*\s]+Copyright\s', content):
text = f"{spec} does not appear to contain a Copyright comment. Please stick to the format\n\n"
text += "# Copyright (c) 2022 Unsong Hero\n\n"
text += "or use osc service runall format_spec_file"
self.review_messages['declined'] = text
return False
if re.search(r'\nVendor:', content):
self.review_messages['declined'] = "{spec} contains a Vendor line, this is forbidden."
return False
if not re.search(r'\n%changelog\s', content) and not re.search(r'\n%changelog$', content):
text = f"{spec} does not contain a %changelog line. We don't want a changelog in the spec file"
text += ", but the %changelog section needs to be present\n"
self.review_messages['declined'] = text
return False
if not re.search('#[^\n]*license', content, flags=re.IGNORECASE):
text = f"{spec} does not appear to have a license. The file needs to contain a free software license\n"
text += "Suggestion: use \"osc service runall format_spec_file\" to get our default license or\n"
text += "the minimal license:\n\n"
text += "# This file is under MIT license\n"
self.review_messages['declined'] = text
return False
# Check that we have for each spec file a changes file - and that at least one
# contains changes
changes = spec.replace('.spec', '.changes')
# new or deleted .changes files also count
old_exists = os.path.exists(os.path.join(old, changes))
new_exists = os.path.exists(os.path.join(directory, changes))
if old_exists != new_exists:
changes_updated = True
elif old_exists and new_exists:
if subprocess.run(["cmp", "-s", os.path.join(old, changes), os.path.join(directory, changes)]).returncode:
changes_updated = True
if not changes_updated:
self.review_messages['declined'] = "No changelog. Please use 'osc vc' to update the changes file(s)."
return False
return True
def source_is_scm_staging_submission(self, source_project):
"""Checks whether the source project is a scm_submission source project"""
return any(source_project.startswith(allowed_src) for allowed_src in self.allowed_scm_submission_sources)
def source_has_required_maintainers(self, source_project):
"""Checks whether the source project has the required maintainer
If a 'required-source-maintainer' is set, it checks whether it is a
maintainer for the source project. Inherited maintainership is
intentionally ignored to have explicit maintainer set.
source_project - source project name
"""
self.logger.info(
f'Checking required maintainer from the source project ({self.required_maintainer})'
)
if not self.required_maintainer:
return True
meta = ET.fromstringlist(show_project_meta(self.apiurl, source_project))
maintainers = meta.xpath('//person[@role="maintainer"]/@userid')
maintainers += ['group:' + g for g in meta.xpath('//group[@role="maintainer"]/@groupid')]
return self.required_maintainer in maintainers
def __ensure_add_role_request(self, source_project):
"""Returns add_role request ID for given source project. Creates that add role if needed."""
try:
add_roles = get_request_list(self.apiurl, source_project,
req_state=['new', 'review'], req_type='add_role')
add_roles = list(filter(self.__is_required_maintainer, add_roles))
if len(add_roles) > 0:
return add_roles[0].reqid
else:
add_role_msg = f'Created automatically from request {self.request.reqid}'
return create_add_role_request(self.apiurl, source_project, self.required_maintainer,
'maintainer', message=add_role_msg)
except HTTPError as e:
self.logger.error(
f'Cannot create the corresponding add_role request for {self.request.reqid}: {e}'
)
def __is_required_maintainer(self, request):
"""Returns true for add role requests that adds required maintainer user or group"""
action = request.actions[0]
user = self.required_maintainer
if user.startswith('group:'):
group = user.replace('group:', '')
return action.group_name == group and action.group_role == 'maintainer'
else:
return action.person_name == user and action.person_role == 'maintainer'
@staticmethod
def checkout_package(*args, **kwargs):
_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
try:
result = osc.core.checkout_package(*args, **kwargs)
finally:
sys.stdout = _stdout
return result
def _package_source_parse(self, project, package, revision=None, repository=None):
query = {'view': 'info', 'parse': 1}
if revision:
query['rev'] = revision
if repository:
query['repository'] = repository
url = osc.core.makeurl(self.apiurl, ['source', project, package], query)
ret = {'name': None, 'version': None}
try:
xml = ET.parse(osc.core.http_GET(url)).getroot()
except HTTPError as e:
self.logger.error(f'ERROR in URL {url} [{e}]')
return ret
if xml.find('error') is not None:
self.logger.error("%s/%s/%s: %s", project, package, repository, xml.find('error').text)
return ret
# ET boolean check fails.
if xml.find('name') is not None:
ret['name'] = xml.find('name').text
if xml.find('version') is not None:
ret['version'] = xml.find('version').text
if xml.find('filename') is not None:
ret['filename'] = xml.find('filename').text
self.logger.debug("%s/%s/%s: %s", project, package, repository, ret)
return ret
def only_changes(self):
u = osc.core.makeurl(self.apiurl, ['request', self.request.reqid],
{'cmd': 'diff', 'view': 'xml'})
try:
diff = ET.parse(osc.core.http_POST(u)).getroot()
for f in diff.findall('action/sourcediff/files/file/*[@name]'):
if not f.get('name').endswith('.changes'):
return False
return True
except HTTPError:
pass
return False
def check_action_add_role(self, request, action):
# Decline add_role request (assumed the bot acting on requests to Factory or similar).
message = f'Roles to packages are granted in the devel project, not in {action.tgt_project}.'
if action.tgt_package is not None:
project, package = devel_project_fallback(self.apiurl, action.tgt_project, action.tgt_package)
message += f' Send this request to {project}/{package}.'
self.review_messages['declined'] = message
return False
def check_action_delete_package(self, request, action):
self.target_project_config(action.tgt_project)
try:
result = osc.core.show_project_sourceinfo(self.apiurl, action.tgt_project, True, (action.tgt_package))
root = ET.fromstring(result)
except HTTPError:
return None
# Decline the delete request if there is another delete/submit request against the same package
query = "match=state/@name='new'+and+(action/target/@project='{}'+and+action/target/@package='{}')"\
"+and+(action/@type='delete'+or+action/@type='submit')".format(action.tgt_project, action.tgt_package)
url = osc.core.makeurl(self.apiurl, ['search', 'request'], query)
matches = ET.parse(osc.core.http_GET(url)).getroot()
if int(matches.attrib['matches']) > 1:
ids = [rq.attrib['id'] for rq in matches.findall('request')]
self.review_messages['declined'] = (
f"There is a pending request {','.join(ids)} to {action.tgt_project}/{action.tgt_package} in process.")
return False
# Decline delete requests against linked flavor package
linked = root.find('sourceinfo/linked')
if not (linked is None or self.check_linked_package(action, linked)):
return False
if not self.ignore_devel:
self.devel_project_review_ensure(request, action.tgt_project, action.tgt_package)
return True
def check_linked_package(self, action, linked):
if linked.get('project', action.tgt_project) != action.tgt_project:
return True
linked_package = linked.get('package')
self.review_messages['declined'] = f"Delete the package {linked_package} instead"
return False
def check_action_delete_project(self, request, action):
# Presumably if the request is valid the bot should be disabled or
# overridden, but seems like no valid case for allowing this (see #1696).
self.review_messages['declined'] = f'Deleting the {action.tgt_project} project is not allowed.'
return False
def check_action_delete_repository(self, request, action):
self.target_project_config(action.tgt_project)
if self.mail_release_list:
self.review_messages['declined'] = 'Deleting repositories is not allowed. ' \
'Contact {} to discuss further.'.format(self.mail_release_list)
return False
self.review_messages['accepted'] = 'unhandled: removing repository'
return True
def run_source_validator(self, old, directory):
scripts = glob.glob("/usr/lib/obs/service/source_validators/*")
if not scripts:
raise RuntimeError.new('Missing source validator')
for script in scripts:
if os.path.isdir(script):
continue
res = subprocess.run([script, '--batchmode', directory, old], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if res.returncode:
text = "Source validator failed. Try \"osc service runall source_validator\"\n"
text += res.stdout.decode('utf-8')
self.review_messages['declined'] = text
return False
for line in res.stdout.decode('utf-8').split("\n"):
# pimp up some warnings
if re.search(r'Attention.*not mentioned', line):
line = re.sub(r'\(W\) ', '', line)
self.review_messages['declined'] = line
return False
return True
def _snipe_out_existing_urls(self, old, directory, specs):
if not os.path.isdir(old):
return
oldsources = self._mentioned_sources(old, specs)
for spec in specs:
specfn = os.path.join(directory, spec)
nspecfn = specfn + '.new'
wf = open(nspecfn, 'w')
with open(specfn) as rf:
for line in rf:
m = re.match(r'(Source[0-9]*\s*):\s*(.*)$', line)
if m and m.group(2) in oldsources:
wf.write(m.group(1) + ":" + os.path.basename(m.group(2)) + "\n")
continue
wf.write(line)
wf.close()
os.rename(nspecfn, specfn)
def check_urls(self, old, directory, specs):
self._snipe_out_existing_urls(old, directory, specs)
oldcwd = os.getcwd()
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(directory)
res = subprocess.run(["/usr/lib/obs/service/download_files", "--enforceupstream",
"yes", "--enforcelocal", "yes", "--outdir", tmpdir], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if res.returncode:
self.review_messages['new'] = "Source URLs are not valid. Try `osc service runall download_files`.\n" + \
res.stdout.decode('utf-8')
os.chdir(oldcwd)
return False
os.chdir(oldcwd)
return True
def difflines(self, oldf, newf):
with open(oldf, 'r') as f:
oldl = f.readlines()
with open(newf, 'r') as f:
newl = f.readlines()
return list(difflib.unified_diff(oldl, newl))
def _mentioned_sources(self, directory, specs):
sources = set()
for spec in specs:
specfn = os.path.join(directory, spec)
if not os.path.exists(specfn):
continue
with open(specfn) as f:
for line in f:
m = re.match(r'Source[0-9]*\s*:\s*(.*)$', line)
if not m:
continue
sources.add(m.group(1))
return sources
def detect_mentioned_patches(self, old, directory, specs):
# new packages have different rules
if not os.path.isdir(old):
return True
opatches = self.list_patches(old)
npatches = self.list_patches(directory)
cpatches = opatches.intersection(npatches)
opatches -= cpatches
npatches -= cpatches
if not npatches and not opatches:
return True
patches_to_mention = {}
for p in opatches:
patches_to_mention[p] = 'old'
for p in npatches:
patches_to_mention[p] = 'new'
for changes in glob.glob(os.path.join(directory, '*.changes')):
base = os.path.basename(changes)
oldchanges = os.path.join(old, base)
if os.path.exists(oldchanges):
diff = self.difflines(oldchanges, changes)
else:
with open(changes, 'r') as f:
diff = ['+' + line for line in f.readlines()]
for line in diff:
pass
# Check if the line mentions a patch being added (starts with +)
# or removed (starts with -)
if not re.match(r'[+-]', line):
continue
# In any of those cases, remove the patch from the list
line = line[1:].strip()
for patch in list(patches_to_mention):
if line.find(patch) >= 0:
del patches_to_mention[patch]
# if a patch is mentioned as source, we ignore it
sources = self._mentioned_sources(directory, specs)
sources |= self._mentioned_sources(old, specs)
for s in sources:
patches_to_mention.pop(s, None)
if not patches_to_mention:
return True
lines = []
for patch, state in patches_to_mention.items():
# wording stolen from Raymond's declines :)
if state == 'new':
lines.append(f"A patch ({patch}) is being added without this addition being mentioned in the changelog.")
else:
lines.append(f"A patch ({patch}) is being deleted without this removal being mentioned in the changelog.")
self.review_messages['declined'] = '\n'.join(lines)
return False
def list_patches(self, directory):
ret = set()
for ext in ['*.diff', '*.patch', '*.dif']:
for file in glob.glob(os.path.join(directory, ext)):
ret.add(os.path.basename(file))
return ret
class CommandLineInterface(ReviewBot.CommandLineInterface):
def __init__(self, *args, **kwargs):
ReviewBot.CommandLineInterface.__init__(self, args, kwargs)
self.clazz = CheckSource
def get_optparser(self) -> CmdlnOptionParser:
parser = ReviewBot.CommandLineInterface.get_optparser(self)
parser.add_option('--skip-add-reviews', action='store_true', default=False,
help='skip adding review after completing checks')
return parser
def setup_checker(self):
bot = ReviewBot.CommandLineInterface.setup_checker(self)
bot.skip_add_reviews = self.options.skip_add_reviews
return bot
if __name__ == "__main__":
app = CommandLineInterface()
sys.exit(app.main())