+ def __repr__(self):
+ return '<Keyring %s>' % self.keyring_name
+
+ def de_escape_gpg_str(self, txt):
+ esclist = re.split(r'(\\x..)', txt)
+ for x in range(1,len(esclist),2):
+ esclist[x] = "%c" % (int(esclist[x][2:],16))
+ return "".join(esclist)
+
+ def parse_address(self, uid):
+ """parses uid and returns a tuple of real name and email address"""
+ import email.Utils
+ (name, address) = email.Utils.parseaddr(uid)
+ name = re.sub(r"\s*[(].*[)]", "", name)
+ name = self.de_escape_gpg_str(name)
+ if name == "":
+ name = uid
+ return (name, address)
+
+ def load_keys(self, keyring):
+ if not self.keyring_id:
+ raise Exception('Must be initialized with database information')
+
+ k = os.popen(self.gpg_invocation % keyring, "r")
+ key = None
+ need_fingerprint = False
+
+ for line in k:
+ field = line.split(":")
+ if field[0] == "pub":
+ key = field[4]
+ self.keys[key] = {}
+ (name, addr) = self.parse_address(field[9])
+ if "@" in addr:
+ self.keys[key]["email"] = addr
+ self.keys[key]["name"] = name
+ need_fingerprint = True
+ elif key and field[0] == "uid":
+ (name, addr) = self.parse_address(field[9])
+ if "email" not in self.keys[key] and "@" in addr:
+ self.keys[key]["email"] = addr
+ self.keys[key]["name"] = name
+ elif need_fingerprint and field[0] == "fpr":
+ self.keys[key]["fingerprints"] = [field[9]]
+ self.fpr_lookup[field[9]] = key
+ need_fingerprint = False
+
+ def import_users_from_ldap(self, session):
+ import ldap
+ cnf = Config()
+
+ LDAPDn = cnf["Import-LDAP-Fingerprints::LDAPDn"]
+ LDAPServer = cnf["Import-LDAP-Fingerprints::LDAPServer"]
+ ca_cert_file = cnf.get('Import-LDAP-Fingerprints::CACertFile')
+
+ l = ldap.open(LDAPServer)
+
+ if ca_cert_file:
+ # TODO: This should request a new context and use
+ # connection-specific options (i.e. "l.set_option(...)")
+
+ # Request a new TLS context. If there was already one, libldap
+ # would not change the TLS options (like which CAs to trust).
+ #l.set_option(ldap.OPT_X_TLS_NEWCTX, True)
+ ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
+ #ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, None)
+ ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, ca_cert_file)
+ l.start_tls_s()
+
+ l.simple_bind_s("","")
+ Attrs = l.search_s(LDAPDn, ldap.SCOPE_ONELEVEL,
+ "(&(keyfingerprint=*)(gidnumber=%s))" % (cnf["Import-Users-From-Passwd::ValidGID"]),
+ ["uid", "keyfingerprint", "cn", "mn", "sn"])
+
+ ldap_fin_uid_id = {}
+
+ byuid = {}
+ byname = {}
+
+ for i in Attrs:
+ entry = i[1]
+ uid = entry["uid"][0]
+ name = get_ldap_name(entry)
+ fingerprints = entry["keyFingerPrint"]
+ keyid = None
+ for f in fingerprints:
+ key = self.fpr_lookup.get(f, None)
+ if key not in self.keys:
+ continue
+ self.keys[key]["uid"] = uid
+
+ if keyid != None:
+ continue
+ keyid = get_or_set_uid(uid, session).uid_id
+ byuid[keyid] = (uid, name)
+ byname[uid] = (keyid, name)
+
+ return (byname, byuid)
+
+ def generate_users_from_keyring(self, format, session):
+ byuid = {}
+ byname = {}
+ any_invalid = False
+ for x in self.keys.keys():
+ if "email" not in self.keys[x]:
+ any_invalid = True
+ self.keys[x]["uid"] = format % "invalid-uid"
+ else:
+ uid = format % self.keys[x]["email"]
+ keyid = get_or_set_uid(uid, session).uid_id
+ byuid[keyid] = (uid, self.keys[x]["name"])
+ byname[uid] = (keyid, self.keys[x]["name"])
+ self.keys[x]["uid"] = uid
+
+ if any_invalid:
+ uid = format % "invalid-uid"
+ keyid = get_or_set_uid(uid, session).uid_id
+ byuid[keyid] = (uid, "ungeneratable user id")
+ byname[uid] = (keyid, "ungeneratable user id")
+
+ return (byname, byuid)
+
+__all__.append('Keyring')
+
+@session_wrapper
+def get_keyring(keyring, session=None):
+ """
+ If C{keyring} does not have an entry in the C{keyrings} table yet, return None
+ If C{keyring} already has an entry, simply return the existing Keyring
+
+ @type keyring: string
+ @param keyring: the keyring name
+
+ @rtype: Keyring
+ @return: the Keyring object for this keyring
+ """
+
+ q = session.query(Keyring).filter_by(keyring_name=keyring)
+
+ try:
+ return q.one()
+ except NoResultFound:
+ return None
+
+__all__.append('get_keyring')
+
+@session_wrapper
+def get_active_keyring_paths(session=None):
+ """
+ @rtype: list
+ @return: list of active keyring paths
+ """
+ return [ x.keyring_name for x in session.query(Keyring).filter(Keyring.active == True).order_by(desc(Keyring.priority)).all() ]
+
+__all__.append('get_active_keyring_paths')
+
+@session_wrapper
+def get_primary_keyring_path(session=None):
+ """
+ Get the full path to the highest priority active keyring
+
+ @rtype: str or None
+ @return: path to the active keyring with the highest priority or None if no
+ keyring is configured
+ """
+ keyrings = get_active_keyring_paths()
+
+ if len(keyrings) > 0:
+ return keyrings[0]
+ else:
+ return None