# -*- coding: utf-8 -*-
import base64
from enum import Enum, auto
from OpenSSL import crypto
from OpenSSL.crypto import X509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from .curp import CURP
from .rfc import RFC, RFCType
from ..exceptions import CFDIError
REGIMEN_SOCIETARIOS = {
'AC': 'AC - Asociación civil',
'S DE CV': 'Sociedad de capital variable',
'S DE RL DE CV': 'Sociedad de responsabilidad limitada de capital variable',
'S DE RL': 'Sociedad de responsabilidad limitada',
'S EN C DE CV': 'Sociedad en comandita simple de capital variable',
'S EN C POR A DE CV': 'Sociedad en comandita por acciones de capital variable',
'S EN C POR A': 'Sociedad en comandita por acciones',
'S EN C': 'Sociedad en comandita simple',
'S EN NC DE CV': 'Sociedad en nombre colectivo de capital variable',
'S EN NC': 'Sociedad en nombre colectivo',
'SA DE CV': 'Sociedad anónima de capital variable',
'SA': 'Sociedad anónima',
'SAB DE CV': 'Sociedad anómina bursátil de capital variable',
'SAB': 'Sociedad anómina bursátil',
'SAPI DE CV': 'Sociedad anónima promotora de inversión de capital variable',
'SAPI': 'Sociedad anónima promotora de inversión',
'SAPIB': 'Sociedad anónima promotora de inversión bursátil',
'SAS DE CV': 'Sociedad por acciones simplificada de capital variable',
'SAS': 'Sociedad por acciones simplificada',
'SC DE CV': 'Sociedad civil de capital variable',
'SC': 'Sociedad civil',
}
[docs]class CertificateType(Enum):
Fiel = auto()
CSD = auto()
[docs]class Certificate:
def __init__(self, certificate: X509):
self.certificate = certificate
[docs] @classmethod
def load_certificate(cls, certificate: bytes) -> 'Certificate':
return cls(crypto.load_certificate(crypto.FILETYPE_ASN1, certificate))
[docs] def fingerprint(self, algorithm=hashes.SHA1()) -> bytes:
return self.certificate.to_cryptography().fingerprint(algorithm=algorithm)
[docs] def certificate_bytes(self) -> bytes:
return crypto.dump_certificate(crypto.FILETYPE_ASN1, self.certificate)
[docs] def certificate_base64(self) -> str:
return base64.b64encode(
self.certificate_bytes()
).decode()
[docs] def issuer(self) -> str:
# return self.certificate.to_cryptography().issuer.rfc4514_string()
d = self.certificate.get_issuer().get_components()
return ','.join(f'{k.decode()}={v.decode()}' for k, v in reversed(d))
[docs] def subject(self) -> str:
d = self.certificate.get_subject().get_components()
return ','.join(f'{k.decode()}={v.decode()}' for k, v in reversed(d))
@property
def type(self):
if self.certificate.get_extension_count() == 4:
return CertificateType.Fiel
if self.certificate.get_extension_count() == 2:
return CertificateType.CSD
return None
@property
def serial_number(self) -> int:
return self.certificate.get_serial_number()
# useful data
@property
def legal_name(self) -> str | None:
try:
at = self.certificate.get_subject().O
if self.rfc.type == RFCType.MORAL:
for r in REGIMEN_SOCIETARIOS:
if at.endswith(" " + r):
return at[:-len(r) - 1]
return at
except AttributeError:
return None
@property
def rfc(self) -> RFC | None:
try:
at = self.certificate.get_subject().x500UniqueIdentifier
at = at.split("/")[0].strip()
return RFC(at)
except AttributeError:
return None
@property
def rfc_representante(self) -> RFC | None:
try:
at = self.certificate.get_subject().x500UniqueIdentifier
at = at.split("/")[1].strip()
return RFC(at)
except AttributeError:
return None
@property
def curp(self) -> CURP | None:
try:
at = self.certificate.get_subject().serialNumber
at = at.split("/")[0].strip()
return CURP(at) if at else None
except AttributeError:
return None
@property
def curp_representante(self) -> CURP | None:
try:
at = self.certificate.get_subject().serialNumber
at = at.split("/")[1].strip()
return CURP(at)
except AttributeError:
return None
@property
def branch_name(self) -> str | None:
try:
return self.certificate.get_subject().OU
except AttributeError:
return None
@property
def rfc_pac(self) -> RFC:
if nom_suc := self.branch_name:
if nom_suc.startswith("PAC"):
return RFC(nom_suc[3:15])
if nom_suc.startswith("MEGAPAC"):
return RFC(nom_suc[7:19])
raise CFDIError("Certificado no es PAC")
@property
def email(self) -> str | None:
try:
return self.certificate.get_subject().emailAddress
except AttributeError:
return None
@property
def certificate_number(self) -> str:
return f'{self.certificate.get_serial_number():x}'[1::2]
[docs] def public_key(self) -> rsa.RSAPublicKey:
return self.certificate.get_pubkey().to_cryptography_key()
def _verify(self, data, signature, algorithm) -> bool:
try:
self.public_key().verify(
signature=signature,
data=data,
padding=padding.PKCS1v15(),
algorithm=algorithm
)
return True
except InvalidSignature:
return False
[docs] def verify_sha1(self, data: bytes, signature: bytes) -> bool:
return self._verify(
data=data,
signature=signature,
algorithm=hashes.SHA1()
)
[docs] def verify_sha256(self, data: bytes, signature: bytes) -> bool:
return self._verify(
data=data,
signature=signature,
algorithm=hashes.SHA256()
)
[docs] def encrypt(self, data: bytes):
return self.public_key().encrypt(
plaintext=data,
padding=padding.PKCS1v15()
)