Coverage for auth/lib_auth.py: 59%

204 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-01-26 23:58 +0100

1# -*- coding: utf-8 -*- 

2__author__ = 'moilerat' 

3# 

4# # Safia copyright 2022-2023 VR - LICENSETOBEDEFINED : on premise treatment custom 

5# 

6#! Access LibExternal and configure context param json 

7# 

8 

9from abc import abstractmethod 

10 

11# 23-7-23 Tentative d'écrire de l'architecture d'une lib_auth 

12# LibAuth fournit un système d'authenfitication et renvoie un ContextUser (ie appelé LSS as of 22-7-23 

13# Elle contient un membre 

14# Tout ceci peut-etre un singleton 

15# Son type d'authentification est défini soit par un paramètre, soit par un héritage (- [ ] TODO a choisir entre les deux) de son membre de connection (ses en cas de paramètre plutot qu'héritage) 

16# - OTP API 

17# - OAuth Ext 

18# - User/Pass 

19# - [ ] TODO choisir si on veut que lib_auth soit un singleton et du coup n'ait qu'une fonction pour authentifier 

20# plutot que de stocker les authentification en temps que membre 

21# 

22# 

23from auth.lib_conf_system import lcs_global_singleton 

24class LibAuth(): 

25 def __init__(self, type_auth = "otp_apia", lib_data = None, param_connect = {}): 

26 self.type_auth = type_auth 

27 if lib_data == None: 

28 if self.type_auth == "otp_apia": 

29 self.lib_data = lcs_global_singleton.get_lib_at_apia_singleton() 

30 elif self.type_auth == "ext_oauth": 

31 self.lib_data = "TODO AND HOW TO CHOOSE" 

32 elif self.type_auth == "user_pass": 

33 self.lib_data = lcs_global_singleton.get_lib_pg_singleton() 

34 else : 

35 print("Type auth " + self.type_auth + " not supported !") 

36 else : 

37 self.lib_data = lib_data 

38 self.param_connect = param_connect 

39 pass 

40 

41 # - [ ] TODO VR a choisir lib_auth pourrait ne donner que get_user et ensuite LSS renverrai le contex_user en fonction des connexions existantes pour ce user 

42 def connect_get_context_user(self): 

43 pass 

44 

45 

46# [ ] TODO Une classe qui soit une couche qui garantie qu'elle vérifie les droits 

47# Appelé par exemple ContextRightUser (c'est le lib_right actuel), qui aurait un user_id en interne, elle serait aussi crée à la création par lib_auth 

48# Préfère t'on qu'elle encapsule les fonctions d'accès aux données ou plutot qu'elle soit appelé (à priori par lib_right) 

49# Le premier cas fais qu'on doit copier-coller les noms des fonctions 

50# Le deuxième cas est plus risqué car on peut oublié de le faire 

51# 

52# Moins critique que l'authentification en temps qu'un user 

53# les droits d'accès aux projets pourrait etre aussi fais 

54# dans une lib tierces encapsulé de manière adéquate de manière à empecher leur accès sans droits 

55# - [ ] TODO VR choisir la facon d'encapsuler lib_right 

56class ContextRightUser(): 

57 def __init__(self, user_id, lib_right_singleton = None, lib_data_internal_singleton = None): 

58 self.user_id = user_id 

59 self.lib_right_singleton = lib_right_singleton 

60 self.lib_data_internal_singleton = lib_data_internal_singleton 

61 

62# LSS aurait donc comme membre ContextRightUser, tout ceci construit par register_user_get_data ou plutot LibAuth 

63# Les accès à des données interne ou externe peuvent aussi avoir une architecture adéquate 

64 

65# - [ ] TODO VR 22-7-23 : 

66# 

67 

68# TODO VR 5-7-23 For now it will be LibPGSafiaSys 

69# pas utile s'il n'est pas prévu de changer 

70class LibUserDataInternalService(): 

71 def __init__(self): 

72 pass 

73 

74class LibUserDataExternalServices(): 

75 def __init__(self, type_auth = "hard_coded", lib_at = None): 

76 self.from_mail_to_send = "" 

77 self.from_mail_user = "" # or used or info 

78 self.__from_mail_default = "victor@reutenauer.eu" 

79 self.auth_valid = None 

80 

81 self.type_auth = type_auth 

82 

83 self.defaut_github_issue = "" 

84 self.github_token = "" 

85 

86 if lib_at != None: 

87 print("Using airtable authentification") 

88 self.type_auth = "airtable" 

89 self.lib_at = lib_at 

90 

91 # USED IN get_datou_exec_context_as_complete_param_json 

92 def get_info_from_mail_already_validated(self, from_mail): 

93 if self.type_auth == "hard_coded": 

94 return self.get_info_from_mail_hc(from_mail) 

95 elif self.type_auth == "airtable" : 

96 return self.get_info_from_mail_at(from_mail) 

97 else : 

98 print("Unexpected type_auth : " + str(self.type_auth)) 

99 return False, "", False, "", "" 

100 

101 def get_info_from_mail_hc(self, from_mail): 

102 self.from_mail_user = from_mail 

103 if from_mail == "support.message" or from_mail.endswith("@fotonower.com") or from_mail == "mendes.hugo1612@gmail.com" or from_mail == "ayoub.masmoudi@hotmail.fr" or from_mail == "fotonower@gmail.com" or from_mail == "speechtotext@artpollo.com" or from_mail == "vr@opio.fr" or from_mail == "speechtotext@artxpollo.com" or from_mail == "genevieve.houglet@gmail.com": 

104 self.from_mail_to_send = from_mail 

105 self.from_mail_user = "" 

106 self.auth_valid = True 

107 else : 

108 self.from_mail_to_send = "victor@reutenauer.eu" 

109 self.from_mail_user = from_mail 

110 self.auth_valid = False 

111 return self.auth_valid, "hard_coded", False, "", "" 

112 

113 def get_info_from_mail_at(self, from_mail): 

114 valid, conf, privacy, defaut_github_issue, github_token = self.lib_at.get_info_user_data_from_mail(from_mail) 

115 self.defaut_github_issue = defaut_github_issue 

116 self.github_token = github_token 

117 if valid: 

118 self.from_mail_to_send = from_mail 

119 self.from_mail_user = "" 

120 self.auth_valid = True 

121 else : 

122 self.from_mail_to_send = self.__from_mail_default 

123 self.from_mail_user = from_mail 

124 self.auth_valid = False 

125 return valid, conf, privacy, defaut_github_issue, github_token 

126 

127 def get_git_param(self): 

128 return self.defaut_github_issue, self.github_token 

129 

130 def get_message_mail_valid(self, verbose = False): 

131 if self.auth_valid == None: 

132 print("ERROR no info on validity of user") 

133 

134 message = "" 

135 if self.auth_valid : 

136 if verbose : 

137 print("Email valid, no info to pass !") 

138 else : 

139 message = "Email " + self.from_mail_user + " wasn't valid so we don't send the result to him directly, we could send a link to how to register !<br>\n" 

140 # TODO send link to self.from_mail_user to https://www.openpromptia.com/apia/assistant 

141 # if it is not you, please inform us by replying 

142 if verbose: 

143 print(message) 

144 

145 return message 

146 

147 def get_from_mail_to_send(self): 

148 return self.from_mail_to_send 

149 

150 # TODO Pour l'instant par composition 

151 # mais restera dans cette classe quand 

152 # on mettra le reste en abstract et qu'on procèdera par héritage 

153 def get_info_auth(self, otp = ""): 

154 

155 data = {} 

156 

157 print("Using airtable authentification") 

158 type_auth = "airtable" 

159 try: 

160 data = self.lib_at.get_otp_info(otp) 

161 except Exception as e: 

162 print("Lack of internet or airtable error, maybe useless") 

163 print(str(e)) 

164 

165 print(" Mouais : " + str(data)) 

166 

167 from datetime import datetime 

168 

169 # Parse the date string 

170 date_str = data["ExpiryDate"] if "ExpiryDate" in data else '2014-09-06T12:34:56.789Z' 

171 date = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ') 

172 

173 data["ExpiryDateDT"] = date 

174 

175 # Get the current date 

176 current_date = datetime.now() #timezone.utc) 

177 

178 # Compare the dates 

179 if date > current_date: 

180 print("token is still valid.") 

181 data["is_valid"] = True 

182 elif date < current_date: 

183 print("Token has expired, we need to update this.") 

184 self.lib_at.update_otp_link(otp, status = "expired") 

185 data["is_valid"] = False 

186 else: 

187 print("The date is now (impossible).") 

188 exit(1) 

189 

190 return data 

191 

192 def connect_with_otp(self, otp): 

193 print("About to check " + str(otp)) 

194 info_connexion = self.get_info_auth(otp) 

195 

196 return info_connexion, info_connexion["is_valid"] 

197 

198 

199import hashlib 

200from datetime import datetime 

201 

202def create_id(): 

203 now = datetime.now() 

204 date_string = now.strftime("%Y%m%d%H%M%S%f") 

205 # Utilisation du module hashlib pour hasher la date string en hexadecimal 

206 hash_object = hashlib.sha256(date_string.encode()) 

207 hex_dig = hash_object.hexdigest() 

208 return hex_dig 

209# from uuid import uuid4 

210# return str(uuid4()) 

211 

212def build_layer_from_configuration(lss, complete_param_json): 

213 map_type_layer_inst = {} 

214 if lss != None: 

215 map_conf_selected, map_configuration_with_class_name = lss.load_configuration(with_class_name = True) 

216 

217 print(" map_conf_selected seems useless here : " + str(map_conf_selected)) 

218 

219 for layer_type in map_configuration_with_class_name: 

220 # layer_type = conf["layer_type"] 

221 class_name = map_configuration_with_class_name[layer_type]["class_name"] 

222 module_name = map_configuration_with_class_name[layer_type]["module_name"] 

223 if class_name != "LayerGeneric": 

224 # module_name = 'lib.lib_openai' 

225 import importlib 

226 module = importlib.import_module(module_name) 

227 

228 # class_name = conf["class_name"] 

229 from lib.brick_layers.lib_abstract_generic_layer import LayerPrompt 

230 # from lib.lib_openai import LayerPromptOpenAI 

231 # import lib 

232 my_class = getattr(module, class_name) 

233 inst = my_class(complete_param_json) 

234 # list_globals = globals() 

235 # if class_name in list_globals : 

236 # inst = list_globals[class_name](complete_param_json) 

237 # else : 

238 # print("Missing global : " + str(class_name)) 

239 # inst = None 

240 map_type_layer_inst[layer_type] = inst 

241 

242 return map_type_layer_inst 

243 

244def get_datou_exec_context_as_complete_param_json(from_mail = "info@opio.fr", verbose = False, 

245 privacy = False, openai_token = "", lss=None, project_id=None): 

246# valid, user_conf, privacy_db, defaut_github_issue, github_token = lss.lib_user_data_external.get_info_from_mail_already_validated(from_mail) 

247 user_conf = {} 

248 defaut_github_issue = None 

249 github_token = "" 

250 # Je le mets dans datou_exec (vu que safia envoie aussi des mails) 

251 # PAs sur, car je n'ai pas d'objet pour l'instant ! 

252 info_auth = "notused" 

253# info_auth = lss.lib_user_data_external.get_message_mail_valid() 

254# from_mail_to_send = lss.lib_user_data_external.get_from_mail_to_send() 

255# defaut_github_issue, github_token = lss.lib_user_data_external.get_git_param() 

256 from_mail_to_send = "vr@opio.fr" 

257 defaut_github_issue = None 

258 github_token = "" 

259 

260 config_project = lss.lib_user_data_internal.load_conf_project(project_id) if lss.lib_user_data_internal != None else {} 

261 

262 hash_id_treatment = create_id() 

263 

264 complete_param_json = {} 

265 param_json = {"info_auth" : info_auth, "hash_id_treatment" : hash_id_treatment, 

266"privacy" : privacy, "from_mail_to_send" : from_mail_to_send, 

267"config_project" : config_project} 

268 complete_param_json.update(param_json) 

269 

270 param_json = {"google_token" : None} 

271 complete_param_json.update(param_json) 

272 if openai_token == "": 

273 from auth.lib_conf_system import lcs_global_singleton 

274 openai_token = lcs_global_singleton.get_openai_api_key() 

275 param_json = {"openai_token" : openai_token} 

276 complete_param_json.update(param_json) 

277 param_json = {"gpt_model" : "gpt-4-1106-preview"} # gpt-4 gpt-3.5-turbo 

278 complete_param_json.update(param_json) 

279 

280 git_param_json = {"defaut_github_issue" : defaut_github_issue, "github_token" : github_token, "privacy" : privacy} 

281 complete_param_json.update(git_param_json) 

282 

283 complete_param_json["project_id"] = project_id 

284 if lss != None: 

285 complete_param_json["user_id"] = lss.get_user_id() 

286 

287 if lss != None: 

288 info_project = lss.get_project_info(project_id) 

289 if "table_documents" in info_project: 

290 table_documents = info_project["table_documents"] 

291 import_json_param_json = {"table_documents" : table_documents} 

292 complete_param_json.update(import_json_param_json) 

293 

294 if "function_search_documents" in info_project: 

295 function_search_documents = info_project["function_search_documents"] 

296 search_NN_param_json = {"match_page_sections" : function_search_documents} 

297 complete_param_json.update(search_NN_param_json) 

298 

299 param_json_save_result = {"user" : from_mail} 

300 complete_param_json.update(param_json_save_result) 

301 

302# configurations = lss.lib_data.get_configuration_option() 

303 

304 if lss == None: 

305 print("lss is needed to configure correctly the datou steps with external api configured in the pg mtr.mtradmin db") 

306 else : 

307 map_type_layer_inst = build_layer_from_configuration(lss, complete_param_json) 

308 

309 if "layers" in config_project: 

310 for k in config_project["layers"]: 

311 if "name" not in config_project["layers"][k] or "model" not in config_project["layers"][k]: 

312 continue 

313 configuration_Value = config_project["layers"][k]["name"] 

314 # ollama_on_rene 

315 map_config = lss.lib_user_data_internal.get_configuration_option_one(configuration_Value) 

316 

317 if "model" in config_project["layers"][k]: 

318 complete_param_json["gpt_model"] = config_project["layers"][k]["model"] 

319 

320 if "format" in config_project["layers"][k]: 

321 complete_param_json["format_structured_output"] = config_project["layers"][k]["format"] 

322 

323 if len(map_config) == 1: 

324 layer_type = map_config[0]["layer_type"] 

325 if layer_type != k: 

326 print("Unexpected 311 !") 

327 else: 

328 class_name = map_config[0]["class_name"] 

329 module_name = map_config[0]["module_name"] 

330 if class_name != "LayerGeneric": 

331 # module_name = 'lib.lib_openai' 

332 import importlib 

333 module = importlib.import_module(module_name) 

334 

335 my_class = getattr(module, class_name) 

336 inst = my_class(complete_param_json) 

337 map_type_layer_inst[layer_type] = inst 

338 else : 

339 print("Unexpected 314 !") 

340 

341 # Contexte execution traitement datou sans PG, VR 2023 

342 complete_param_json["map_type_layer_inst"] = map_type_layer_inst 

343 

344 return complete_param_json 

345 

346