Coverage for lib/lib_mail.py: 26%

101 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-01-26 23:58 +0100

1__author__ = 'moilerat' 

2 

3import ssl 

4if hasattr(ssl, '_create_unverified_context'): 

5 ssl._create_default_https_context = ssl._create_unverified_context 

6 

7def get_mail(server, username, password, port, verbose = False, tempdir = None, privacy = False, debug = False): 

8 mail_server = MailParam(server, username, password, port, tempdir = tempdir, privacy = privacy, debug = debug) 

9 list_files = mail_server.get_new_mail_with_files() 

10 return list_files 

11 

12def find_from(text): 

13 import re 

14 # Je pense qu'il n'y a jamais de www.github.com 

15 pattern = r'([\-\w\.\d]+\@[\-\w]+\.[\w]+)' 

16 matches = re.findall(pattern, text) 

17 

18 list_found = [] 

19 

20 for match in matches: 

21 list_found.append(match) 

22 

23 if len(list_found) == 1: 

24 return list_found[0] 

25 else : 

26 print(" text parsed without results : " + str(text)) 

27 print("Unexpected multiples emails found : " + str(list_found)) 

28 return "vr@opio.fr" 

29 

30def extract_object(tuple_val_1): 

31 import email 

32 object_mail_aux = email.header.decode_header(tuple_val_1) 

33 if len(object_mail_aux) == 1 and len(object_mail_aux[0]) == 2: 

34 type_data = object_mail_aux[0][1] 

35 if type_data == None : 

36 object_mail = object_mail_aux[0][0] 

37 elif type_data == "utf-8" : 

38 object_mail = object_mail_aux[0][0].decode(type_data) 

39 else : 

40 print("Unexpected type_data : " + str(type_data)) 

41 object_mail = str(object_mail_aux[0][0]) 

42 return object_mail 

43 else : 

44 print("Unexpected len object_mail_aux : " + str(object_mail_aux)) 

45 return "" 

46 

47class MailParam(): 

48 def __init__(self, server, username, password, port = 995, tempdir = None, privacy = False, debug = False): 

49 self.server = server 

50 self.username = username 

51 self.password = password 

52 self.port = port 

53 self.tempdir = tempdir if tempdir != None else "" 

54 if privacy: 

55 print("Using tempdir " + str(tempdir)) 

56 self.privacy = privacy 

57 self.debug = debug 

58 

59 def get_new_mail_with_files(self, verbose = False): 

60 import poplib 

61 import email 

62 

63 list_files = [] 

64 

65 Mailbox = poplib.POP3_SSL(self.server, self.port) 

66 Mailbox.user(self.username) 

67 Mailbox.pass_(self.password) 

68 list_complete = Mailbox.list() 

69 NumofMessages = len(list_complete[1]) 

70 for i in range(NumofMessages): 

71 message = Mailbox.retr(i+1) 

72 #msgnum = int(message.split(' ')[0]) 

73 # WORKS Mailbox.dele(i + 1) 

74# str_message = email.message_from_string("".join(str(message[1]))) 

75 str_message = email.message_from_bytes(b'\n'.join(message[1])) 

76 

77 from_mail = "vr@opio.fr" 

78 object_mail = "" 

79 

80 if verbose : 

81 print(str_message._headers) 

82 from_found = False 

83 cc_mail = "" 

84 for tuple_val in str_message._headers : 

85 if tuple_val[0] == "DKIM-Signature": 

86 from_found = True 

87 # ('From', '"Victor Reutenauer" <victor@fotonower.com>') 

88 dkim_signature = tuple_val[1] 

89 if tuple_val[0] == "From": 

90 from_found = True 

91 # ('From', '"Victor Reutenauer" <victor@fotonower.com>') 

92 from_mail = find_from(tuple_val[1]) 

93 if tuple_val[0] == "Subject": 

94 try : 

95 object_mail = extract_object(tuple_val[1]) 

96 except Exception as e: 

97 print(str(e)) 

98 if tuple_val[0].lower() == "cc": 

99 #from_found = True 

100 cc_mail = find_from(tuple_val[1]) 

101 

102 # save attach 

103 for part in str_message.walk(): 

104 print(part.get_content_type()) 

105 

106 if part.get_content_maintype() == 'multipart': 

107 continue 

108 

109 if part.get('Content-Disposition') is None: 

110 continue 

111 

112 filename = part.get_filename() 

113 if not(filename): 

114 continue 

115 

116 import os 

117 filepath = os.path.join(self.tempdir, filename) 

118 if not os.path.exists(self.tempdir): 

119 os.makedirs(self.tempdir) 

120 fp = open(filepath, 'wb') 

121 fp.write(part.get_payload(decode=1)) 

122 fp.close 

123 file = {"from_mail" : from_mail, "file" : filepath, "object" : object_mail} 

124 if cc_mail != "": 

125 file["cc"] = cc_mail 

126 list_files.append(file) 

127 

128 if not self.debug: 

129 Mailbox.dele(i + 1) 

130 

131 Mailbox.quit() 

132 return list_files 

133 

134