forked from flakas/reconbot
-
Notifications
You must be signed in to change notification settings - Fork 6
/
run.py
107 lines (92 loc) · 2.77 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import schedule
import time
import os
import yaml
from reconbot.tasks import esi_notification_task
from reconbot.notifiers.caching import CachingNotifier
from reconbot.notifiers.discordwebhook import DiscordWebhookNotifier
from reconbot.notifiers.splitter import SplitterNotifier
from reconbot.apiqueue import ApiQueue
from reconbot.esi import ESI
from reconbot.sso import SSO
from dotenv import load_dotenv
load_dotenv()
notification_caching_timer = 10
webhook_url = os.getenv("WEBHOOK_URL")
sso_app_client_id = os.getenv("SSO_APP_CLIENT_ID")
sso_app_secret_key = os.getenv("SSO_APP_SECRET_KEY")
with open('./characters.yaml') as file:
# The FullLoader parameter handles the conversion from YAML
# scalar values to Python the dictionary format
characters_list = yaml.load(file)
discord = {
'webhook': {
'url': webhook_url
}
}
sso_app = {
'client_id': sso_app_client_id,
'secret_key': sso_app_secret_key
}
notifications = {
'whitelist': [
'AllWarDeclaredMsg',
'DeclareWar',
'AllWarInvalidatedMsg',
'AllyJoinedWarAggressorMsg',
'CorpWarDeclaredMsg',
'OwnershipTransferred',
'MoonminingExtractionStarted',
'MoonminingExtractionCancelled',
'MoonminingExtractionFinished',
'SovStructureDestroyed',
'SovStructureReinforced',
'StructureUnderAttack',
'OwnershipTransferred',
'StructureOnline',
# 'StructureFuelAlert',
'StructureAnchoring',
'StructureServicesOffline',
'StructureLostShields',
'StructureLostArmor',
'TowerAlertMsg',
'StationServiceEnabled',
'StationServiceDisabled',
'SovAllClaimAquiredMsg',
'SovStationEnteredFreeport',
'SovAllClaimLostMsg',
'SovStructureSelfDestructRequested',
'SovStructureSelfDestructFinished',
'StationConquerMsg',
]
}
my_discord_channels = CachingNotifier(
SplitterNotifier([
DiscordWebhookNotifier(
discord['webhook']['url']
)
]),
duration=7200
)
def api_to_sso(api):
return SSO(
sso_app['client_id'],
sso_app['secret_key'],
api['refresh_token'],
api['character_id']
)
api_queue_logistics = ApiQueue(list(map(api_to_sso, characters_list.values())))
def notifications_job_logistics():
esi_notification_task(
notifications,
api_queue_logistics,
'discord',
my_discord_channels
)
def run_and_schedule(characters, notifications_job):
notifications_job()
schedule.every(notification_caching_timer/len(characters)).minutes.do(notifications_job)
run_and_schedule(characters_list, notifications_job_logistics)
while True:
schedule.run_pending()
time.sleep(1)