import os
from datetime import date, datetime
from decimal import Decimal
from io import BytesIO
from typing import Sequence
from .catalog import *
from .code import *
from .utils import _format_rfc, DIOTWriter, encrypt_triple_des, period_code, catalog_code
from ..ans1e import *
from ..models import RFC, RFCType, CURP, Certificate
from ..utils import iterate
__all__ = [
'DIOT',
'DatosIdentificacion',
'ProveedorTercero',
'DatosComplementaria',
'TipoOperacion',
'TipoTercero',
'Periodo',
'Pais'
]
from ..zip import zip_create, ZipData
current_dir = os.path.dirname(__file__)
VERSION_DEMC = 10
CODES = "0123456789ABCDEFGHIJKLMNOPQRSTUV"
CERTIFICATE = Certificate.load_certificate(
certificate=open(os.path.join(current_dir, 'De_srv_x509.CER'), 'rb').read()
)
[docs]
class DatosIdentificacion:
def __init__(
self,
rfc: str | RFC,
ejercicio: int,
curp: str = None,
nombre: str = None,
apellido_paterno: str = None,
apellido_materno: str = None,
razon_social: str = None):
self.rfc = RFC(rfc)
self.ejercicio = ejercicio
self.curp = CURP(curp) if curp else None
self.nombre = nombre.upper() if nombre else None
self.apellido_paterno = apellido_paterno.upper() if apellido_paterno else None
self.apellido_materno = apellido_materno.upper() if apellido_materno else None
self.razon_social = razon_social.upper() if razon_social else None
[docs]
def to_dict(self):
return {
"RFC": self.rfc,
"CURP": self.curp,
"Ejercicio": self.ejercicio,
"ApellidoPaterno": self.apellido_paterno,
"ApellidoMaterno": self.apellido_materno,
"Nombre": self.nombre,
"RazonSocial": self.razon_social
}
[docs]
def render(self, w: DIOTWriter):
w(10001, 0, 0, _format_rfc(self.rfc))
if self.rfc.type == RFCType.FISICA:
w(10002, 0, 0, self.curp)
w(10021, 0, 0, self.ejercicio)
if self.rfc.type == RFCType.FISICA:
w(10003, 0, 0, self.apellido_paterno)
w(10004, 0, 0, self.apellido_materno)
w(10005, 0, 0, self.nombre)
else:
w(10006, 0, 0, self.razon_social)
[docs]
class ProveedorTercero:
def __init__(
self,
tipo_tercero: TipoTercero,
tipo_operacion: TipoOperacion,
rfc: str | RFC = None,
id_fiscal: str = None,
nombre_extranjero: str = None,
pais: Pais = None,
nacionalidad: str = None,
iva16: int = None,
iva16_na: int = None,
iva_rfn: int = None,
iva_rfn_na: int = None,
iva_import16: int = None,
iva_import16_na: int = None,
iva_import_exento: int = None,
iva0: int = None,
iva_exento: int = None,
retenido: int = None,
devoluciones: int = None):
"""
:param tipo_tercero:
:param tipo_operacion:
:param rfc:
:param id_fiscal: Numero de ID Fiscal
:param nombre_extranjero: Nombre del Extranjero
:param pais: Pais de residencia
:param nacionalidad: Nacionalidad
:param iva16: Total de los actos o actividades pagados a la tasa 16% de IVA
:param iva16_na: Total del IVA pagado no acreditable a la tasa 16%
:param iva_rfn: Total de los actos o actividades pagados sujeto al estimulo de la region fronteriza norte
:param iva_rfn_na: Total del IVA pagado no acreditable sujeto al estimulo de la region fronteriza norte
:param iva_import16: Total de los actos o actividades pagados en la importacion de bienes y servicios a la tasa 16%
:param iva_import16_na: Total del IVA pagado no acreditable en la importacion de bienes y servicios a la tasa 16%
:param iva_import_exento: Total de los actos o actividades pagados en la importacion de bienes y servicios por los que no se pagara IVA (Exentos)
:param iva0: Total de los actos o actividades pagados a la tasa 0% de IVA
:param iva_exento: Total de los actos o actividades pagados por los que no se pagara IVA (Exentos)
:param retenido: Total del IVA Retenido por el contribuyente
:param devoluciones: Total del IVA correspondiente a las devoluciones, descuentos y bonificaciones sobre compras
"""
self.tipo_tercero = tipo_tercero
self.tipo_operacion = tipo_operacion
self.rfc = RFC(rfc) if rfc else None
self.id_fiscal = id_fiscal
self.nombre_extranjero = nombre_extranjero
self.pais = pais
self.nacionalidad = nacionalidad
self.iva16 = iva16
self.iva16_na = iva16_na
self.iva_rfn = iva_rfn
self.iva_rfn_na = iva_rfn_na
self.iva_import16 = iva_import16
self.iva_import16_na = iva_import16_na
self.iva_import_exento = iva_import_exento
self.iva0 = iva0
self.iva_exento = iva_exento
self.retenido = retenido
self.devoluciones = devoluciones
match self.tipo_tercero:
case TipoTercero.PROVEEDOR_NACIONAL:
if not self.rfc:
raise ValueError("RFC is required for NACIONAL")
case TipoTercero.PROVEEDOR_EXTRANJERO:
pass
case TipoTercero.PROVEEDOR_GLOBAL:
if self.rfc:
raise ValueError("RFC must be empty for GLOBAL")
[docs]
def to_dict(self):
return {
"TipoTercero": catalog_code(TIPO_TERC, self.tipo_tercero),
"TipoOperacion": catalog_code(TIP_OPERA, self.tipo_operacion),
"RFC": self.rfc,
"NumeroIDFiscal": self.id_fiscal,
"NombreExtranjero": self.nombre_extranjero,
"PaisResidencia": catalog_code(PAISES, self.pais),
"Nacionalidad": self.nacionalidad,
"ActividadesIVATasa16": self.iva16,
"NoAcreditableIVATasa16": self.iva16_na,
"ActividadesRegionFronterizaNorte": self.iva_rfn,
"NoAcreditableRegionFronterizaNorte": self.iva_rfn_na,
"ActividadesImportacionIVATasa16": self.iva_import16,
"NoAcreditableImportacionIVATasa16": self.iva_import16_na,
"ActividadesImportacionExento": self.iva_import_exento,
"ActividadesIVATasa0": self.iva0,
"ActividadesIVAExento": self.iva_exento,
"Retenido": self.retenido,
"Devoluciones": self.devoluciones
}
[docs]
def render(self, w: DIOTWriter):
w(200261, 1, 1, int(self.tipo_tercero))
w(200361, 1, 1, int(self.tipo_operacion))
if self.id_fiscal:
w(200561, 1, 1, self.id_fiscal)
if self.nombre_extranjero:
w(200661, 1, 1, self.nombre_extranjero)
if self.pais:
w(200761, 1, 1, self.pais)
if self.nacionalidad:
w(200861, 1, 1, self.nacionalidad)
if self.rfc:
w(200461, 1, 1, _format_rfc(self.rfc))
if self.iva16 is not None:
w(301061, 1, 1, self.iva16)
if self.iva16_na is not None:
w(301161, 1, 1, self.iva16_na)
if self.iva_rfn is not None:
w(302461, 1, 1, self.iva_rfn)
if self.iva_rfn_na is not None:
w(301361, 1, 1, self.iva_rfn_na)
if self.iva_import16 is not None:
w(301461, 1, 1, self.iva_import16)
if self.iva_import16_na is not None:
w(301561, 1, 1, self.iva_import16_na)
if self.iva_import_exento is not None:
w(301861, 1, 1, self.iva_import_exento)
if self.iva0 is not None:
w(301961, 1, 1, self.iva0)
if self.iva_exento is not None:
w(302061, 1, 1, self.iva_exento)
if self.retenido is not None:
w(302161, 1, 1, self.retenido)
if self.devoluciones is not None:
w(302261, 1, 1, self.devoluciones)
[docs]
class DatosComplementaria:
def __init__(
self,
folio_anterior: str,
fecha_presentacion_anterior: date
):
self.folio_anterior = folio_anterior
self.fecha_presentacion_anterior = fecha_presentacion_anterior
[docs]
def render(self, w: DIOTWriter):
w(361, 0, 0, self.folio_anterior)
w(461, 0, 0, self.fecha_presentacion_anterior.strftime('%Y%m%d'))
[docs]
def to_dict(self):
return {
"FolioAnterior": self.folio_anterior,
"FechaPresentacionAnterior": self.fecha_presentacion_anterior
}
class Totales:
def __init__(self, providers: Sequence[ProveedorTercero]):
self.operations_count = len(providers)
self.iva16 = sum(p.iva16 or 0 for p in providers)
self.iva16_na = sum(p.iva16_na or 0 for p in providers)
self.iva_rfn = sum(p.iva_rfn or 0 for p in providers)
self.iva_rfn_na = sum(p.iva_rfn_na or 0 for p in providers)
self.iva_import16 = sum(p.iva_import16 or 0 for p in providers)
self.iva_import16_na = sum(p.iva_import16_na or 0 for p in providers)
self.iva_import_exento = sum(p.iva_import_exento or 0 for p in providers)
self.iva0 = sum(p.iva0 or 0 for p in providers)
self.iva_exento = sum(p.iva_exento or 0 for p in providers)
self.retenido = sum(p.retenido or 0 for p in providers)
self.devoluciones = sum(p.devoluciones or 0 for p in providers)
# N102561=®((((N101261*16)/100)+((N101461*11)/100)+((N102761*15)/100)+((N102961*16)/100)*0.5)-(10/1000))
# Note: N102961 es N102861
self.total = round(self.iva16 * Decimal(".16") + self.iva_rfn * Decimal(".08") - Decimal(".01"))
# N102661=®(((N101661*16)/100)+((N101861*11)/100)-(10/1000))
self.total_import = round(self.iva_import16 * Decimal(".16") - Decimal(".01"))
def to_dict(self):
return {
"TotalOperaciones": self.operations_count,
"ActividadesIVATasa16": self.iva16,
"NoAcreditableIVATasa16": self.iva16_na,
"ActividadesRegionFronterizaNorte": self.iva_rfn,
"NoAcreditableRegionFronterizaNorte": self.iva_rfn_na,
"ActividadesImportacionIVATasa16": self.iva_import16,
"NoAcreditableImportacionIVATasa16": self.iva_import16_na,
"ActividadesImportacionExento": self.iva_import_exento,
"ActividadesIVATasa0": self.iva0,
"ActividadesIVAExento": self.iva_exento,
"Retenido": self.retenido,
"Devoluciones": self.devoluciones,
"TotalExceptoImportaciones": self.total,
"TotalImportaciones": self.total_import
}
def render(self, w: DIOTWriter):
w(100161, 1, 0, self.operations_count)
w(101261, 1, 0, self.iva16)
w(101361, 1, 0, self.iva16_na)
w(101461, 1, 0, 0)
w(102861, 1, 0, self.iva_rfn)
w(101561, 1, 0, self.iva_rfn_na)
w(101661, 1, 0, self.iva_import16)
w(101761, 1, 0, self.iva_import16_na)
w(101861, 1, 0, 0)
w(101961, 1, 0, 0)
w(102061, 1, 0, self.iva_import_exento)
w(102161, 1, 0, self.iva0)
w(102261, 1, 0, self.iva_exento)
w(102361, 1, 0, self.retenido)
w(102461, 1, 0, self.devoluciones)
w(102561, 1, 0, self.total)
w(102661, 1, 0, self.total_import)
class DatosGenerales:
def __init__(
self,
declaracion: str,
periodo: Periodo,
complementaria: DatosComplementaria = None,
):
self.periodo = periodo
self.periodicidad = period_code(self.periodo)
self.complementaria = complementaria
if complementaria:
self.type = "2"
else:
self.type = "1"
self.declaracion = declaracion
def render(self, w: DIOTWriter):
w(161, 0, 0, self.declaracion)
w(261, 0, 0, self.type)
if self.complementaria:
self.complementaria.render(w)
w(661, 0, 0, int(self.periodicidad))
w(561, 0, 0, int(self.periodo))
def to_dict(self):
return {
"Declaracion": catalog_code(PRESENTA, self.declaracion),
"TipoDeclaracion": catalog_code(TIP_DECLA, self.type),
"Complementaria": self.complementaria.to_dict() if self.complementaria else None,
"Periodicidad": catalog_code(PERIODIC, self.periodicidad),
"Periodo": catalog_code(PERIODO, self.periodo)
}
[docs]
class DIOT:
tag = "DIOT"
def __init__(
self,
datos_identificacion: DatosIdentificacion,
periodo: Periodo,
complementaria: DatosComplementaria = None,
proveedores: ProveedorTercero | Sequence[ProveedorTercero] = None
):
proveedores = list(iterate(proveedores))
self.datos_identificacion = datos_identificacion
self.datos_generales = DatosGenerales(
declaracion="1" if proveedores else "0",
periodo=periodo,
complementaria=complementaria
)
self.proveedores = proveedores
self.totales = Totales(proveedores)
[docs]
def to_dict(self):
return {
"DatosIdentificacion": self.datos_identificacion.to_dict(),
"DatosGenerales": self.datos_generales.to_dict(),
"Proveedores": [p.to_dict() for p in self.proveedores],
"Totales": self.totales.to_dict(),
}
[docs]
def render(self, w: DIOTWriter):
w(40006, 0, 0, '20062')
w(10040, 0, 0, '081')
self.datos_identificacion.render(w)
self.datos_generales.render(w)
if self.proveedores:
self.totales.render(w)
for i, p in enumerate(self.proveedores, start=1):
w(200161, 1, 1, i)
p.render(w)
w.end()
def _tmp_filename(self):
rfc = self.datos_identificacion.rfc
if len(rfc) == 12:
rfc = f"_{rfc}"
v_dem = "0"
n_formulario = 'DOT'
ver_clte = CODES[VERSION_DEMC]
ver_form = CODES[10]
mes_inicial = CODES[1]
mes_final = CODES[12]
period = CODES[self.datos_identificacion.ejercicio % 100]
x_date = datetime.now()
anio = CODES[x_date.year % 100]
mes = CODES[x_date.month]
dia = CODES[x_date.day]
hora = x_date.strftime("%H%M")
suffix = "2" if len(self.proveedores) > 40000 else "1"
return f"{rfc}{v_dem}{n_formulario}{ver_clte}{ver_form}{period}{mes_inicial}{period}{mes_final}{anio}{mes}{dia}{hora}{suffix}"
[docs]
def plain_bytes(self) -> bytes:
with BytesIO() as b:
self.plain_write(b)
return b.getvalue()
[docs]
def plain_write(self, target):
w = DIOTWriter(target)
self.render(w)
def _zip_bytes(self, tmp_filename) -> bytes:
with BytesIO() as b:
self._zip_write(b, tmp_filename)
return b.getvalue()
def _zip_write(self, target: BytesIO, tmp_filename):
zip_create(target, [
ZipData(f"C:\\{tmp_filename}.txt", lambda s: self.render(DIOTWriter(s)))
])
def _encrypted_bytes(self, tmp_filename) -> bytes:
with BytesIO() as b:
self._encrypted_write(b, tmp_filename)
return b.getvalue()
def _encrypted_write(self, target, tmp_filename):
with BytesIO() as zip_data:
self._zip_write(zip_data, tmp_filename)
algo, key, iv, data_encrypted = encrypt_triple_des(zip_data)
issuer = CERTIFICATE.certificate.get_issuer().der()
enc_key = CERTIFICATE.encrypt(key)
# SERIALIZE ANS1 FILE
e = Ans1Encoder()
with e.seq():
e.oid("1.2.840.113549.1.7.3")
with e.enter(nr=0, cls=Classes.Context):
with e.seq():
e(0)
with e.set():
with e.seq():
e(0)
with e.seq():
e.write(issuer)
e(CERTIFICATE.serial_number)
with e.seq():
e.oid("1.2.840.113549.1.1.1")
e()
e(enc_key)
with e.seq():
e.oid("1.2.840.113549.1.7.1")
with e.seq():
e.oid(algo)
e(iv)
e(data_encrypted.getvalue(), nr=0, cls=Classes.Context)
e.stream(target)
[docs]
def generate_package(self, dirname: str = None) -> str:
"""
Generate Package, return filename
"""
tmp_filename = self._tmp_filename()
if dirname:
filename = os.path.join(dirname, tmp_filename + ".dec")
else:
filename = tmp_filename + ".dec"
with open(filename, 'wb') as f:
self._encrypted_write(f, tmp_filename)
return filename