Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion sale_order_import/tests/test_order_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from unittest import mock

from odoo import exceptions
from odoo.tests import Form
from odoo.tests import Form, RecordCapturer

from .common import TestCommon

Expand Down Expand Up @@ -258,3 +258,114 @@ def test_order_import_log_errored_line(self):
lambda m: messages[0] in m.body and messages[1] in m.body
)
)

def test_create_missing_invoice_partner(self):
"""Tests creation of missing invoice partner when ctx flag is True

Expected behavior: the import workflow is not halted, and a new invoicing
partner is created on the fly.
"""
parsed_order = dict(
self.parsed_order,
invoice_to={
"country_code": "FR",
"email": "test@invoice.partner",
"name": "Test Invoice Address",
},
)
wiz = self.wiz_model.with_context(create_missing_invoice_partner=True)
with RecordCapturer(self.env["res.partner"], []) as rc_partner:
order = wiz.create_order(parsed_order, "pricelist")
self.assertEqual(len(rc_partner.records), 1)
invoice_partner = rc_partner.records
self.assertEqual(invoice_partner.type, "invoice")
self.assertEqual(invoice_partner.parent_id, self.partner)
self.assertEqual(invoice_partner.country_id, self.env.ref("base.fr"))
self.assertEqual(invoice_partner.email, "test@invoice.partner")
self.assertEqual(invoice_partner.name, "Test Invoice Address")
self.assertEqual(order.partner_invoice_id, invoice_partner)
self.assertIn("Created invoice partner", order.message_ids[0].body)

def test_create_missing_invoice_partner_disabled(self):
"""Tests creation of missing invoice partner when ctx flag is False or not set

Expected behavior: the import workflow is halted with an error.
"""
parsed_order = dict(
self.parsed_order,
invoice_to={
"country_code": "FR",
"email": "test@invoice.partner",
"name": "Test Invoice Address",
},
)
wiz = self.wiz_model
Comment thread
simahawk marked this conversation as resolved.

# No context flag
ctx = dict(wiz.env.context)
ctx.pop("create_missing_invoice_partner", None)
wiz = wiz.with_context(ctx) # pylint: disable=context-overridden
with self.assertRaises(exceptions.UserError):
wiz.create_order(parsed_order, "pricelist")

# Context flag set to False
wiz = wiz.with_context(create_missing_invoice_partner=False)
with self.assertRaises(exceptions.UserError):
wiz.create_order(parsed_order, "pricelist")

def test_create_missing_shipping_partner(self):
"""Tests creation of missing shipping partner when ctx flag is True

Expected behavior: the import workflow is not halted, and a new shipping
partner is created on the fly.
"""
parsed_order = dict(
self.parsed_order,
ship_to={
"city": "Rome",
"country_code": "IT",
"street": "Via dei Platani",
"zip": "00100",
},
)
wiz = self.wiz_model.with_context(create_missing_shipping_partner=True, test=1)
with RecordCapturer(self.env["res.partner"], []) as rc_partner:
order = wiz.create_order(parsed_order, "pricelist")
self.assertEqual(len(rc_partner.records), 1)
shipping_partner = rc_partner.records
self.assertEqual(shipping_partner.type, "delivery")
self.assertEqual(shipping_partner.parent_id, self.partner)
self.assertEqual(shipping_partner.city, "Rome")
self.assertEqual(shipping_partner.country_id, self.env.ref("base.it"))
self.assertEqual(shipping_partner.street, "Via dei Platani")
self.assertEqual(shipping_partner.zip, "00100")
self.assertEqual(order.partner_shipping_id, shipping_partner)
self.assertIn("Created shipping partner", order.message_ids[0].body)

def test_create_missing_shipping_partner_disabled(self):
"""Tests creation of missing shipping partner when ctx flag is False or not set

Expected behavior: the import workflow is halted with an error.
"""
parsed_order = dict(
self.parsed_order,
ship_to={
"city": "Rome",
"country_code": "IT",
"street": "Via dei Platani",
"zip": "00100",
},
)
wiz = self.wiz_model

# No context flag
ctx = dict(wiz.env.context)
ctx.pop("create_missing_shipping_partner", None)
wiz = wiz.with_context(ctx) # pylint: disable=context-overridden
with self.assertRaises(exceptions.UserError):
wiz.create_order(parsed_order, "pricelist")

# Context flag set to False
wiz = wiz.with_context(create_missing_shipping_partner=False)
with self.assertRaises(exceptions.UserError):
wiz.create_order(parsed_order, "pricelist")
182 changes: 170 additions & 12 deletions sale_order_import/wizard/sale_order_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# @author: Simone Orsi <simahawk@gmail.com>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import copy
import logging
import mimetypes
from base64 import b64decode, b64encode
Expand Down Expand Up @@ -62,6 +63,9 @@ class SaleOrderImport(models.TransientModel):
skip_error_lines = fields.Boolean(
help="Ignore and push all error lines to the chatter when importing if enabled."
)
# Allow creating invoice/shipping partners on import
create_missing_invoice_partner = fields.Boolean(default=False)
create_missing_shipping_partner = fields.Boolean(default=False)

@api.onchange("order_file")
def order_file_change(self):
Expand Down Expand Up @@ -196,6 +200,11 @@ def parse_pdf_order(self, order_file, detect_doc_type=False):
# 'name': 'Camptocamp',
# 'email': 'luc@camptocamp.com',
# },
# 'invoice_to': { # Same structure and fields as 'partner'; completely optional
# 'vat': 'FR25499247138',
# 'name': 'Camptocamp',
# 'email': 'luc@camptocamp.com',
# },
# 'ship_to': {
# 'partner': partner_dict,
# 'address': {
Expand Down Expand Up @@ -260,18 +269,34 @@ def _prepare_order(self, parsed_order, price_source):
so_vals = soo.play_onchanges(so_vals, ["partner_id"])
so_vals["order_line"] = []
if parsed_order.get("ship_to"):
shipping_partner = bdio._match_shipping_partner(
parsed_order["ship_to"], partner, parsed_order["chatter_msg"]
)
try:
shipping_partner = bdio._match_shipping_partner(
parsed_order["ship_to"], partner, parsed_order["chatter_msg"]
)
except UserError:
if not self._can_create_missing_shipping_partner():
raise
shipping_partner = self._create_missing_shipping_partner(
parsed_order["ship_to"], partner, parsed_order["chatter_msg"]
)
so_vals["partner_shipping_id"] = shipping_partner.id

if parsed_order.get("delivery_detail"):
so_vals.update(parsed_order.get("delivery_detail"))

if parsed_order.get("invoice_to"):
invoicing_partner = bdio._match_partner(
parsed_order["invoice_to"], parsed_order["chatter_msg"], partner_type=""
)
try:
invoicing_partner = bdio._match_partner(
parsed_order["invoice_to"],
parsed_order["chatter_msg"],
partner_type="",
)
except UserError:
if not self._can_create_missing_invoice_partner():
raise
invoicing_partner = self._create_missing_invoice_partner(
parsed_order["invoice_to"], partner, parsed_order["chatter_msg"]
)
so_vals["partner_invoice_id"] = invoicing_partner.id
if parsed_order.get("date"):
so_vals["date_order"] = parsed_order["date"]
Expand Down Expand Up @@ -399,9 +424,16 @@ def import_order_button(self):
commercial_partner = partner.commercial_partner_id
partner_shipping_id = False
if parsed_order.get("ship_to"):
partner_shipping_id = bdio._match_shipping_partner(
parsed_order["ship_to"], partner, []
).id
try:
partner_shipping_id = bdio._match_shipping_partner(
parsed_order["ship_to"], partner, []
).id
except UserError:
if not self._can_create_missing_shipping_partner():
raise
partner_shipping_id = self._create_missing_shipping_partner(
parsed_order["ship_to"], partner, []
).id
existing_quotations = self.env["sale.order"].search(
self._search_existing_order_domain(
parsed_order, commercial_partner, [("state", "in", ("draft", "sent"))]
Expand Down Expand Up @@ -465,9 +497,16 @@ def _prepare_update_order_vals(self, parsed_order, order, partner):
)
vals = {"partner_id": partner.id}
if parsed_order.get("ship_to"):
shipping_partner = bdio._match_shipping_partner(
parsed_order["ship_to"], partner, parsed_order["chatter_msg"]
)
try:
shipping_partner = bdio._match_shipping_partner(
parsed_order["ship_to"], partner, parsed_order["chatter_msg"]
)
except UserError:
if not self._can_create_missing_shipping_partner():
raise
shipping_partner = self._create_missing_shipping_partner(
parsed_order["ship_to"], partner, parsed_order["chatter_msg"]
)
vals["partner_shipping_id"] = shipping_partner.id
if parsed_order.get("order_ref"):
vals["client_order_ref"] = parsed_order["order_ref"]
Expand Down Expand Up @@ -712,3 +751,122 @@ def _post_error_lines_message(self, parsed_order, order):
},
subtype_id=self.env.ref("mail.mt_note").id,
)

def _can_create_missing_invoice_partner(self) -> bool:
"""Checks if the current importer allows invoice address creation

When called upon a record, checks field "create_missing_invoice_partner".
When called upon an empty recordset (eg: as a model method), checks context's
key "create_missing_invoice_partner".

Hook method, can be overridden by inheriting modules.
"""
if self:
return self.create_missing_invoice_partner
return bool(self.env.context.get("create_missing_invoice_partner"))
Comment thread
simahawk marked this conversation as resolved.

def _create_missing_invoice_partner(
self, invoice_partner_data: dict, partner: models.BaseModel, chatter_msg: list
) -> models.BaseModel:
"""Creates a new invoice partner for the current import"""
invoice_partner_vals = self._create_missing_invoice_partner_values(
invoice_partner_data, partner, chatter_msg
)
clean_invoice_partner_vals = self._create_missing_partner_vals_cleanup(
invoice_partner_vals
)
invoice_partner = self.env["res.partner"].create([clean_invoice_partner_vals])
chatter_msg.append(
self.env._("Created invoice partner '%s'", invoice_partner.display_name)
)
return invoice_partner

def _create_missing_invoice_partner_values(
self, invoice_partner_data: dict, partner: models.BaseModel, chatter_msg: list
) -> dict:
"""Prepares a new invoice partner's values for the current import"""
# Create a deep copy of the invoice partner data to avoid modifying the original
# dictionary, both keys and values
vals = copy.deepcopy(invoice_partner_data)
vals.update({"type": "invoice", "parent_id": partner.id})
if country_code := vals.get("country_code"):
country = self.env["res.country"].search(
Comment thread
simahawk marked this conversation as resolved.
[("code", "=", country_code)], limit=1
)
if country:
vals["country_id"] = country.id
if (state_code := vals.get("state_code")) and vals.get("country_id"):
state = self.env["res.country.state"].search(
[("code", "=", state_code), ("country_id", "=", vals["country_id"])],
limit=1,
)
if state:
vals["state_id"] = state.id
return vals

def _can_create_missing_shipping_partner(self) -> bool:
"""Checks if the current importer allows shipping address creation

When called upon a record, checks field "create_missing_shipping_partner".
When called upon an empty recordset (eg: as a model method), checks context's
key "create_missing_shipping_partner".

Hook method, can be overridden by inheriting modules.
"""
if self:
return self.create_missing_shipping_partner
return bool(self.env.context.get("create_missing_shipping_partner"))
Comment thread
simahawk marked this conversation as resolved.

def _create_missing_shipping_partner(
self, shipping_partner_data: dict, partner: models.BaseModel, chatter_msg: list
) -> models.BaseModel:
"""Creates a new shipping partner for the current import"""
shipping_partner_vals = self._create_missing_shipping_partner_values(
shipping_partner_data, partner, chatter_msg
)
clean_shipping_partner_vals = self._create_missing_partner_vals_cleanup(
shipping_partner_vals
)
shipping_partner = self.env["res.partner"].create([clean_shipping_partner_vals])
chatter_msg.append(
self.env._("Created shipping partner '%s'", shipping_partner.display_name)
)
return shipping_partner

def _create_missing_shipping_partner_values(
self, shipping_partner_data: dict, partner: models.BaseModel, chatter_msg: list
) -> dict:
"""Prepares a new shipping partner's values for the current import"""
# Create a deep copy of the shipping partner data to avoid modifying the
# original dictionary, both keys and values
vals = copy.deepcopy(shipping_partner_data)
vals.update({"type": "delivery", "parent_id": partner.id})
if country_code := vals.get("country_code"):
country = self.env["res.country"].search(
[("code", "=", country_code)], limit=1
)
if country:
vals["country_id"] = country.id
if (state_code := vals.get("state_code")) and vals.get("country_id"):
state = self.env["res.country.state"].search(
[("code", "=", state_code), ("country_id", "=", vals["country_id"])],
limit=1,
)
if state:
vals["state_id"] = state.id
return vals

def _create_missing_partner_vals_cleanup(self, vals: dict) -> dict:
"""Creates a copy of ``vals`` while removing keys that are not partner fields

A new dictionary is created as a copy of ``vals`` where keys are removed if
they are not ``res.partner`` fields. This allows preserving the old dictionary
for comparison with the new one in inheriting modules.

:param vals: the original values' dictionary to clean up
"""
return {
k: copy.deepcopy(v) # ``deepcopy()`` prevents issues w/ X2many fields cmds
for k, v in vals.items()
if k in self.env["res.partner"]._fields
}
2 changes: 2 additions & 0 deletions sale_order_import/wizard/sale_order_import_view.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
required="doc_type == 'order'"
/>
<field name="skip_error_lines" />
<field name="create_missing_invoice_partner" />
<field name="create_missing_shipping_partner" />
</group>
<footer>
<button
Expand Down
Loading