User:Caron/memo/Automated height correction

From Custom Mario Kart
Jump to navigation Jump to search

Here is the code for the Python implementation of Automated height correction using numpy module.
Tested on python 3.10.

import dataclasses
import os
from concurrent.futures import ProcessPoolExecutor
from typing import ClassVar, Iterable, Union

import numpy as np
import numpy.typing as npt
from typing_extensions import Literal, Self


_int_ptns = [
    'f' + ('\s' + '/'.join([r'+([-+]?\d+)'] * i)) * 3 for i in range(1, 4)]
_face_vert_dtypes = [
    [('v1', 'u2'), ('v2', 'u2'), ('v3', 'u2')],
    [('v1', 'u2'), ('vt1', 'u2'),
     ('v2', 'u2'), ('vt2', 'u2'),
     ('v3', 'u2'), ('vt3', 'u2')],
    [('v1', 'u2'), ('vt1', 'u2'), ('vn1', 'u2'),
     ('v2', 'u2'), ('vt2', 'u2'), ('vn2', 'u2'),
     ('v3', 'u2'), ('vt3', 'u2'), ('vn3', 'u2')]
]

def _get_regex_dtype(header) -> tuple[list[str], list[list[tuple[str, str]]]]:
    if header == 'v':
        return (
            [header + r'\s+([-+]?\d+\.?\d*|\.\d+[eE][-+]?\d+?)' * 3],
            [[('v1', 'f4'), ('v2', 'f4'), ('v3', 'f4')]]
        )
    elif header == 'f':
        return (_int_ptns, _face_vert_dtypes)
    raise ValueError(f'Unsupported header: {header}')


def _regex_parse(
    file: str, header: str
) -> Union[np.ndarray, None]:
    regex, dtype = _get_regex_dtype(header)
    for r, dt in zip(regex, dtype):
        try:
            data = np.fromregex(file=file, regexp=r, dtype=np.dtype(dt))
        except UnicodeDecodeError:
            data = np.fromregex(
                file=file, regexp=r, dtype=np.dtype(dt), encoding='cp932')
        # recarray -> ndarray
        data = (
            data.view(np.recarray).view(dt[0][-1]).reshape(-1, len(dt))
        )
        if data.shape[0] != 0:
            return data
    raise Exception(f"Failed to read {header} data.")


@dataclasses.dataclass(frozen=True)
class Wavefront:
    facetriangles: np.ndarray
    vertices: np.ndarray
    _attr: ClassVar[dict[str, str]] = {'v': 'vertices', 'f': 'facetriangles'}

    @classmethod
    def read(cls: Self, path: str) -> Self:
        """Read the wavefront file.

        Args:
            path (str): The path of the wavefront file.

        Returns:
            Wavefront: The wavefront object. attributes are vertices and facetriangles.
        """
        files = [path] * len(cls._attr)
        init_kwgs = {}
        with ProcessPoolExecutor() as executor:
            fn = [executor.submit(_regex_parse, file=f, header=r)\
                for f, r in zip(files, cls._attr.keys())]
            for header, f in zip(cls._attr.keys(), fn):
                data = f.result()
                if data is None:
                    continue
                if header == 'v' and data.shape[1] == 6:
                    data = data[:, :3]
                init_kwgs[cls._attr[header]] = data

        if len(init_kwgs['vertices']) == 0:
            raise RuntimeError(
                'Failed to read Vertices: {}'.format(path))
        if len(init_kwgs['facetriangles']) == 0:
            raise RuntimeError(
                'Failed to read Face indexes: {}'.format(path))

        lsize = init_kwgs['facetriangles'].shape[1]
        if lsize == 3:
            s = ...
        elif lsize == 6:
            s = slice(None, None, 2)
        else:
            s = slice(None, None, 3)

        init_kwgs['facetriangles'] = init_kwgs['vertices'][
            init_kwgs['facetriangles'][:, s] - 1]
        return cls(**init_kwgs)


def _inplace_pinv(T: np.ndarray) -> np.ndarray:
    """
    Calculate the pseudo-inverse of a matrix in-place.

    Args:
        T: The matrix to invert. 2D or 3D.

    Returns:
        np.ndarray: 2D or 3D matrix.
        if `T` is 3D, the first dimension is the same as `T`.
    """
    if T.ndim >= 3:
        assert T.ndim == 3
        if T.shape[1] != T.shape[2]:
            assert T.shape[0] == T.shape[1]
            T = T.T
    dt = np.linalg.det(T)
    invalid_mask = np.isclose(dt, 0)
    if not np.any(invalid_mask):
        # can be inverted
        return np.linalg.inv(T)
    if np.any(invalid_mask) and T.ndim == 2:
        # apply pseudo inverse
        return np.linalg.pinv(T)

    # in-place inversion
    T_inv_invalid = np.linalg.pinv(T[invalid_mask])
    T_ = T.copy()
    # replace to dummy
    T_[invalid_mask] = np.eye(T_.shape[-1])
    T_inv_ = np.linalg.inv(T_)
    T_inv_[invalid_mask] = T_inv_invalid
    return T_inv_


def autoY(
    obj: Union[Wavefront, str, os.PathLike],
    pos: npt.ArrayLike,
    index: Union[int, Iterable[int], None] = None,
    priority: Literal['topmost', 'lowest', 'nearest'] = 'topmost',
    drop_undriveable_face: bool = True,
    inplace_inv: bool = False,
) -> np.ndarray:
    """
    Automatic Height Correction for Y-Axis.

    Args:
        obj (Wavefront, str, os.PathLike): Wavefront object or path to .obj file.
        pos (np.ndarray, list, tuple, etc.): 2D or 3D array of the coordinates of the object.
        Last dimension must be 3.
        index (int or iterable, optional): Index of the object.
        priority (str, optional): Priority when there are multiple faces.
        Defaults to 'topmost'.
        drop_undriveable_face (bool, optional): Drop surfaces with normals
        less than or equal to 0.
        inplace_inv (bool, optional): Enable in-place matrix inversion.
        Use if inverse calculation fails. Defaults to False.

    Returns:
        np.ndarray: Array of the same shape as `pos`.
    """
    assert priority in ('topmost', 'lowest', 'nearest'), priority

    if not isinstance(obj, Wavefront):
        obj = Wavefront.read(obj)

    pos = np.asarray(pos, dtype=np.float32)
    if pos.ndim not in (1, 2):
        raise ValueError(f'`pos` must be 1D or 2D array. Got {pos.ndim}D array.')
    if pos.shape[-1] != 3:
        raise ValueError(f'Last dimension of `pos` must be 3. Got {pos.shape[-1]}.')

    posdim_pre = pos.ndim

    if posdim_pre == 1:
        pos = pos[None]

    if index is None:
        index = list(range(pos.shape[0]))
    elif isinstance(index, int):
        index = [index]
    elif isinstance(index, Iterable):
        assert all(isinstance(i, int) for i in index), index
        index = list(index)

    pos_ = pos.copy() / 100.
    face_tris_ = obj.facetriangles.copy() / 100.

    # Calculate barycentric coordinate
    # See: https://en.wikipedia.org/wiki/Barycentric_coordinate_system

    r1, r2, r3 = map(
        lambda x: x.squeeze(axis=1), np.split(face_tris_, 3, axis=1))
    # T = \left{
        # \begin{matrix}
        # x_1 - x_3 & x_2 - x_3 \\
        # y_1 - y_3 & y_2 - y_3
        # \end{matrix}
    # \right}
    T1 = np.concatenate(
        [r1[None, :, 0] - r3[None, :, 0], r2[None, :, 0] - r3[None, :, 0]])
    T2 = np.concatenate(
        [r1[None, :, -1] - r3[None, :, -1], r2[None, :, -1] - r3[None, :, -1]])
    T = np.concatenate([T1[None], T2[None]], axis=0)
    try:
        T_inv = np.linalg.inv(T.T)
    except np.linalg.LinAlgError as e:
        if not inplace_inv:
            raise ValueError(
                'Cannot calculate inverse with this object. '
                'If you want to calculate anyway, set `inplace_inv` to True.'
            ) from e
        T_inv = _inplace_pinv(T.T)

    result = []
    for idx in index:
        # target position
        r = pos_[idx]
        # \Lambda = T^{-1} (r - r_3)
        l = np.einsum("Nab, Nb->Na", T_inv, (r - r3)[:, [0, 2]])
        l1 = l[:, 0]
        l2 = l[:, 1]
        l3 = 1 - l1 - l2

        cand, = np.where(
            (
                (0 <= l1) & (l1 <= 1)
                & (0 <= l2) & (l2 <= 1)
                & (0 <= l3) & (l3 <= 1)
            )
        )

        # compute fixed Y
        faceZs = obj.facetriangles[cand].astype(np.float64)
        fcross = np.cross(
            faceZs[:, 1] - faceZs[:, 0], faceZs[:, 2] - faceZs[:, 0])
        if drop_undriveable_face:
            fnorm = fcross / np.linalg.norm(fcross, axis=-1, keepdims=True)
            cand = cand[fnorm[:, 1] > 0]
            if cand.size == 0:
                raise ValueError('Cannot find a face that can be driven.')
        d = -np.einsum("Na, Na->N", fcross, faceZs[:, 2])
        fixedY = -np.divide(
            fcross[:, 0] * r[0] + fcross[:, 2] * r[2] + d, fcross[:, 1]
        ).astype(np.float32)

        if cand.size == 1:
            newpos = np.array([pos[idx, 0], fixedY[0], pos[idx, 2]])
        else:
            n_ = len(fixedY)
            fixedrs = np.stack(
                [np.tile(pos[idx, 0], n_), fixedY, np.tile(pos[idx, 2], n_)]).T
            dist = np.linalg.norm(fixedrs - r, axis=1)

            if priority == 'nearest':
                newpos = fixedrs[np.argmin(dist)]
            elif priority == 'topmost':
                newpos = fixedrs[np.argmax(fixedY)]
            else:
                newpos = fixedrs[np.argmin(fixedY)]

        result.append(newpos.astype(pos_.dtype))

    result = np.stack(result)

    if posdim_pre == 1:
        result = result[0]
    return result

Example

import numpy as np

# change if you use above code:
# from autoY import Wavefront, autoY

from pykmp._io._wavefront import Wavefront
from pykmp.math.autoY import autoY

print(autoY('./sample.obj', [-20452.227,   1,   -7238.337]))
# [-20452.227  19195.1    -7238.337]