-
Notifications
You must be signed in to change notification settings - Fork 0
/
pdf2mbox.py
132 lines (116 loc) · 4.06 KB
/
pdf2mbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""pdf2mbox.py: Code file."""
import argparse
import os.path
import sys
import magic
import xmpdf
import mailbox
import importlib
class Mbox:
"""
A class the represents an MBOX.
...
Attributes
----------
mbox_filename : str
File name of associated MBOX
Methods
-------
addmsg(em):
Adds email to mbox
"""
def __init__(self, mbox_filename):
"""
Create Mbox and associates it with mbox_filename.
Creates mbox_filename if it doesn't exist. Opens and appends if exists.
"""
self.mbox = mailbox.mbox(mbox_filename)
def _encode(self, s):
"""Replace non-ASCII characters."""
if s:
return s.encode('ascii', errors='backslashreplace').decode('ascii')
else:
return ''
def addmsg(self, em):
"""Convert an Xmpdf email to mboxMessage format and add to MBOX."""
self.mbox.lock()
try:
msg = mailbox.mboxMessage()
msg.set_unixfrom(self._encode('Author' + self._encode(em.header.
date)))
msg['Date'] = em.header.date
msg['From'] = self._encode(em.header.from_email)
msg['To'] = self._encode(em.header.to)
msg['Subject'] = self._encode(em.header.subject)
msg.set_payload(self._encode(em.body))
self.mbox.add(msg)
self.mbox.flush()
finally:
self.mbox.unlock()
def pdf2mbox(pdf_filename, mbox_filename):
"""Extract emails from PDF file and store in MBOX File.
Parameters
----------
pdf_filename: str
Name of PDF containing emails.
mbox_filename: str
Name of MBOX file that is destination for emails.
Returns
-------
obj
instance of Xmpdf class; contain representation of email collection
"""
with open(pdf_filename, "rb") as f:
xms = xmpdf.Xmpdf(f)
mbox = Mbox(mbox_filename)
for e in xms.emails:
mbox.addmsg(e)
return xms
# CLI
def cli():
"""Process command line arguments."""
parser = argparse.ArgumentParser(description='Generates an mbox from a PDF \
containing emails')
parser.add_argument('pdf_file', help='PDF file provided as input')
parser.add_argument('mbox_file', nargs='?', default='out.mbox',
help='Mbox file generated as output')
parser.add_argument('--version', '-v', action='version',
version=f"%(prog)s \
{importlib.metadata.version('pdf2mbox')}")
parser.add_argument('--overwrite', '-o', action="store_true",
help='overwrite MBOX file if it exists')
parser.add_argument('--csv', nargs='?', const='out.csv',
type=argparse.FileType('w', encoding='utf-8'),
help='generate CSV file output')
cl_args = parser.parse_args()
pdf_filename = cl_args.pdf_file
csv_filename = cl_args.csv
mbox_filename = cl_args.mbox_file
mbox_overwrite = cl_args.overwrite
# File handling
# PDF file
if not os.path.exists(pdf_filename):
sys.exit(f'error: {pdf_filename} does not exist.')
if not os.path.isfile(pdf_filename):
sys.exit(f'error: {pdf_filename} is not a file.')
if magic.from_file(pdf_filename, mime=True) != 'application/pdf':
sys.exit(f'error: {pdf_filename} is not a PDF file.')
# MBOX
if os.path.exists(mbox_filename):
if mbox_overwrite:
print(f'Overwriting {mbox_filename}')
os.remove(mbox_filename)
else:
print(f'Appending email messages to {mbox_filename}')
else:
print(f'Writing email messages in MBOX format to {mbox_filename}')
# csv
if csv_filename:
print(f'Writing csv to {csv_filename.name}')
return pdf_filename, mbox_filename, csv_filename
if __name__ == '__main__':
pdf_filename, mbox_filename, csv_filename = cli()
xms = pdf2mbox(pdf_filename, mbox_filename)
print(xms.info())
if csv_filename:
xms.to_csv(csv_filename)