Skip to content

cpd

Module for performing coherent point drift

Callback

Bases: Protocol

Callback idea shamelessly stolen from pycpd.

Source code in megham/registration/cpd.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class Callback(Protocol):
    """
    Callback idea shamelessly stolen from pycpd.
    """

    def __call__(
        self,
        target: NDArray[np.floating],
        transformed: NDArray[np.floating],
        iteration: int,
        err: float,
    ):
        ...

compute_P(source, target, var, w)

Compute matrix of probabilities of matches between points in source and target.

Parameters:

Name Type Description Default
source NDArray[floating]

The set of source points to be mapped onto the target points. Nominally when you run this it will be the source points transformed to line up with target. Should have shape (nsrcpoints, ndim).

required
target NDArray[floating]

The set of target points to be mapped onto. Should have shape (ntrgpoints, ndim).

required
var NDArray[floating]

The variance of the gaussian mixture model. Should have shape (ndim,).

required
w float

The weight of the uniform distrubution.

0.0

Returns:

Name Type Description
P NDArray[floating]

Probability matrix of matches between the source and target points. P[i,j] is the probability that source[i] corresponds to target[j]. Has shape (nsrcpoints, ntrgpoints).

Source code in megham/registration/cpd.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def compute_P(
    source: NDArray[np.floating],
    target: NDArray[np.floating],
    var: NDArray[np.floating],
    w: float,
) -> NDArray[np.floating]:
    """
    Compute matrix of probabilities of matches between points in source and target.

    Parameters
    ----------
    source : NDArray[np.floating]
        The set of source points to be mapped onto the target points.
        Nominally when you run this it will be the source points transformed to line up with target.
        Should have shape (nsrcpoints, ndim).
    target : NDArray[np.floating]
        The set of target points to be mapped onto.
        Should have shape (ntrgpoints, ndim).
    var : NDArray[np.floating]
        The variance of the gaussian mixture model.
        Should have shape (ndim,).
    w : float, default: 0.0
        The weight of the uniform distrubution.

    Returns
    -------
    P : NDArray[np.floating]
        Probability matrix of matches between the source and target points.
        P[i,j] is the probability that source[i] corresponds to target[j].
        Has shape (nsrcpoints, ntrgpoints).
    """
    # TODO: implement fast gaussian transform
    nsrcpoints, ndim = source.shape
    ntrgpoints = len(target)

    uni = (
        (w / (1 - w))
        * (nsrcpoints / ntrgpoints)
        * np.sqrt(((2 * np.pi) ** ndim) * np.product(var))
    )
    gaussians = np.ones((nsrcpoints, ntrgpoints))
    for dim in range(ndim):
        sq_diff = dist.cdist(
            np.atleast_2d(source[:, dim]).T,
            np.atleast_2d(target[:, dim]).T,
            metric="sqeuclidean",
        )
        gaussians *= np.exp(-0.5 * sq_diff / var[dim])
    norm_fac = np.clip(np.sum(gaussians, axis=0), epsilon, None)

    P = gaussians / (norm_fac + uni)

    return P

cpd(source, target, w=0.0, eps=1e-10, max_iters=500, callback=dummy_callback, method='affine')

Compute the CPD. This is just a wrapper around joint_cpd that puts everything into one dim_group. See the docstring of joint_cpd for details on the parameters and returns.

See https://arxiv.org/abs/0905.2635 for details on the base CPD algorithm.

Source code in megham/registration/cpd.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def cpd(
    source: NDArray[np.floating],
    target: NDArray[np.floating],
    w: float = 0.0,
    eps: float = 1e-10,
    max_iters: int = 500,
    callback: Callback = dummy_callback,
    method: str = "affine",
) -> tuple[
    NDArray[np.floating],
    NDArray[np.floating],
    NDArray[np.floating],
    NDArray[np.floating],
]:
    """
    Compute the CPD.
    This is just a wrapper around joint_cpd that puts everything into one dim_group.
    See the docstring of joint_cpd for details on the parameters and returns.

    See https://arxiv.org/abs/0905.2635 for details on the base CPD algorithm.
    """
    return joint_cpd(
        source=source,
        target=target,
        dim_groups=None,
        w=w,
        eps=eps,
        max_iters=max_iters,
        callback=callback,
        method=method,
    )

joint_cpd(source, target, dim_groups=None, w=0.0, eps=0.001, max_iters=500, callback=dummy_callback, method='affine')

Compute the joint CPD.

This is an extension of the base CPD algorithm that allows you to fit for transforms that treat groups of dimensions seperately but use a jointly computed probability matrix and variance. This is useful in cases where you have information about a set of points that is correlated but in a different basis so you want to avoid degrees of freedom that mix the disprate baseis. For example if you have a set of points where you have both spatial information as well as some non-spatial scalar associated with each point (ie: frequency) you can use this to compute a registration that considers these jointly but transforms them seperately to avoid a scenario where you have a rotation between a spatial and a non-spatial dimension.

Parameters:

Name Type Description Default
source NDArray[floating]

The set of source points to be mapped onto the target points. Should have shape (nsrcpoints, ndim).

required
target NDArray[floating]

The set of target points to be mapped onto. Should have shape (ntrgpoints, ndim).

required
dim_groups Optional[Sequence[Sequence[int] | NDArray[int_]]]

Which dimensions should be transformed together. Each element in this sequence should be a sequence of array of ints that correspond to the columns of source and target that are transformed together. Any columns that are not included here will not be transformed but will still be used when computing the probability and variance. None of the elements in this sequence can have overlap. If set to None all the dimensions will be transformed together.

None
w float

The weight of the uniform distrubution. Set higher to reduce sensitivity to noise and outliers at the expense of potentially worse performance. Should be in the range [0, 1), if not it will snap to one of the bounds.

0.0
eps float

The convergence criteria. When the change in the objective function is less than or equal to this we stop.

1e-10
max_iters int

The maximum number of iterations to run for.

500
callback Callback

Function that runs once per iteration, can be used to visualize the match process. See the Callback Protocol for details on the expected signature.

dummy_callback
method str

The type of transformation to compute. Acceptable values are: affine, rigid. If any other value is passed then transform will be the identity.

'affine'

Returns:

Name Type Description
transform NDArray[floating]

The transform transformation that takes source to target. Apply using megham.transform.apply_transform. Has shape (ndim, ndim).

shift NDArray[floating]

The transformation that takes source to target after transform is applied. Apply using megham.transform.apply_transform. Has shape (ndim,).

transformed NDArray[floating]

Source transformed to align with target. Has shape (nsrcpoints, ndim).

P NDArray[floating]

Probability matrix of matches between the source and target points. P[i,j] is the probability that source[i] corresponds to target[j]. Has shape (nsrcpoints, ntrgpoints).

Raises:

Type Description
ValueError

If source and target don't share ndim. If dim_groups has repeated dimensions or invalid dimensions.

Source code in megham/registration/cpd.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def joint_cpd(
    source: NDArray[np.floating],
    target: NDArray[np.floating],
    dim_groups: Optional[Sequence[Sequence[int] | NDArray[np.int_]]] = None,
    w: float = 0.0,
    eps: float = 1e-3,
    max_iters: int = 500,
    callback: Callback = dummy_callback,
    method: str = "affine",
) -> tuple[
    NDArray[np.floating],
    NDArray[np.floating],
    NDArray[np.floating],
    NDArray[np.floating],
]:
    """
    Compute the joint CPD.

    This is an extension of the base CPD algorithm that allows you to fit for
    transforms that treat groups of dimensions seperately but use a jointly computed
    probability matrix and variance.
    This is useful in cases where you have information about a set of points that is correlated
    but in a different basis so you want to avoid degrees of freedom that mix the disprate baseis.
    For example if you have a set of points where you have both spatial information as well as some
    non-spatial scalar associated with each point (ie: frequency) you can use this to compute a
    registration that considers these jointly but transforms them seperately to avoid a scenario
    where you have a rotation between a spatial and a non-spatial dimension.

    Parameters
    ----------
    source : NDArray[np.floating]
        The set of source points to be mapped onto the target points.
        Should have shape (nsrcpoints, ndim).
    target : NDArray[np.floating]
        The set of target points to be mapped onto.
        Should have shape (ntrgpoints, ndim).
    dim_groups : Optional[Sequence[Sequence[int] | NDArray[np.int_]]], default: None
        Which dimensions should be transformed together.
        Each element in this sequence should be a sequence of array of ints that correspond to the
        columns of source and target that are transformed together.
        Any columns that are not included here will not be transformed but will still be used when
        computing the probability and variance.
        None of the elements in this sequence can have overlap.
        If set to None all the dimensions will be transformed together.
    w : float, default: 0.0
        The weight of the uniform distrubution.
        Set higher to reduce sensitivity to noise and outliers at the expense
        of potentially worse performance.
        Should be in the range [0, 1), if not it will snap to one of the bounds.
    eps : float, default: 1e-10
        The convergence criteria.
        When the change in the objective function is less than or equal to this we stop.
    max_iters : int, default: 500
        The maximum number of iterations to run for.
    callback: Callback, default: dummy_callback
        Function that runs once per iteration, can be used to visualize the match process.
        See the Callback Protocol for details on the expected signature.
    method : str, default: 'affine'
        The type of transformation to compute.
        Acceptable values are: affine, rigid.
        If any other value is passed then transform will be the identity.

    Returns
    -------
    transform : NDArray[np.floating]
        The transform transformation that takes source to target.
        Apply using megham.transform.apply_transform.
        Has shape (ndim, ndim).
    shift : NDArray[np.floating]
        The transformation that takes source to target after transform is applied.
        Apply using megham.transform.apply_transform.
        Has shape (ndim,).
    transformed : NDArray[np.floating]
        Source transformed to align with target.
        Has shape (nsrcpoints, ndim).
    P : NDArray[np.floating]
        Probability matrix of matches between the source and target points.
        P[i,j] is the probability that source[i] corresponds to target[j].
        Has shape (nsrcpoints, ntrgpoints).

    Raises
    ------
    ValueError
        If source and target don't share ndim.
        If dim_groups has repeated dimensions or invalid dimensions.
    """
    ndim = source.shape[1]
    if target.shape[1] != ndim:
        raise ValueError(
            f"Source and target don't have same ndim ({ndim} vs {target.shape[1]})"
        )
    if dim_groups is None:
        dim_groups = [
            np.arange(ndim),
        ]
    else:
        dims_flat = np.concatenate(dim_groups)
        dims_bad = dims_flat[(dims_flat < 0) + (dims_flat >= ndim)]
        if len(dims_bad):
            raise ValueError(f"Invalid dimensions in dim_groups: {dims_bad}")
        dims_uniq, counts = np.unique(dims_flat, return_counts=True)
        repeats = dims_uniq[counts > 1]
        if len(repeats):
            raise ValueError(f"Repeated dimensions in dim_groups: {repeats}")
        dim_groups = [np.array(dim_group, dtype=int) for dim_group in dim_groups]

    var = estimate_var(source, target, dim_groups)
    err = np.inf

    transform = np.eye(ndim)
    shift = np.zeros(ndim)

    transformed = source.copy()
    P = np.ones((len(source), len(target)))
    for i in range(max_iters):
        _err, _transform, _shift = err, transform, shift
        transformed = apply_transform(source, transform, shift)
        P = compute_P(transformed, target, var, w)
        transform, shift, var, err = solve_transform(
            source, target, P, dim_groups, var, method
        )
        callback(target, transformed, i, err)

        if _err - err < eps:
            if err > _err:
                transform, shift = _transform, _shift
            break

    return transform, shift, transformed, P

solve_transform(source, target, P, dim_groups, cur_var, method='affine')

Solve for the transformation at each iteration of CPD.

Parameters:

Name Type Description Default
source NDArray[floating]

The set of source points to be mapped onto the target points. Should have shape (nsrcpoints, ndim).

required
target NDArray[floating]

The set of target points to be mapped onto. Should have shape (ntrgpoints, ndim).

required
P NDArray[floating]

Probability matrix of matches between the source and target points.

required
dim_groups Sequence[NDArray[int_]]

Which dimensions should be transformed together.

required
cur_var float

The current mixture model variance.

required
method str

The type of transformation to compute. Acceptable values are: affine, rigid. If any other value is passed then transform will be the identity.

'affine'

Returns:

Name Type Description
transform NDArray[floating]

Transformation between source and target.

shift NDArray[floating]

Shift to be applied after the affine transformation.

var NDArray[floating]

Current variance of the mixture model.

err float

Current value of the error function.

Source code in megham/registration/cpd.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def solve_transform(
    source: NDArray[np.floating],
    target: NDArray[np.floating],
    P: NDArray[np.floating],
    dim_groups: Sequence[NDArray[np.int_]],
    cur_var: NDArray[np.floating],
    method: str = "affine",
) -> tuple[NDArray[np.floating], NDArray[np.floating], NDArray[np.floating], float]:
    """
    Solve for the transformation at each iteration of CPD.

    Parameters
    ----------
    source : NDArray[np.floating]
        The set of source points to be mapped onto the target points.
        Should have shape (nsrcpoints, ndim).
    target : NDArray[np.floating]
        The set of target points to be mapped onto.
        Should have shape (ntrgpoints, ndim).
    P : NDArray[np.floating]
        Probability matrix of matches between the source and target points.
    dim_groups : Sequence[NDArray[np.int_]]
        Which dimensions should be transformed together.
    cur_var : float
        The current mixture model variance.
    method : str, default: 'affine'
        The type of transformation to compute.
        Acceptable values are: affine, rigid.
        If any other value is passed then transform will be the identity.

    Returns
    -------
    transform : NDArray[np.floating]
        Transformation between source and target.
    shift : NDArray[np.floating]
        Shift to be applied after the affine transformation.
    var : NDArray[np.floating]
        Current variance of the mixture model.
    err : float
        Current value of the error function.
    """
    ndim = source.shape[1]
    N_P = np.sum(P)
    PT1 = np.diag(np.sum(P, axis=0))
    P1 = np.diag(np.sum(P, axis=1))

    mu_src = np.sum(P.T @ source, axis=0) / N_P
    mu_trg = np.sum(P @ target, axis=0) / N_P

    src_hat = source - mu_src
    trg_hat = target - mu_trg

    transform = np.eye(ndim)
    shift = np.zeros(ndim)
    new_var = cur_var.copy()
    err = 0
    for dim_group in dim_groups:
        mu_s = mu_src[dim_group]
        mu_t = mu_trg[dim_group]
        src = src_hat[:, dim_group]
        trg = trg_hat[:, dim_group]

        all_mul = trg.T @ P.T @ src
        src_mul = src.T @ P1 @ src

        if method == "affine":
            tfm = np.linalg.solve(src_mul.T, all_mul.T)
        elif method == "rigid":
            U, _, V = la.svd(all_mul, full_matrices=True)
            corr = np.eye(len(dim_group))
            corr[-1, -1] = la.det((V) @ (U))
            tfm = U @ corr @ V
        else:
            tfm = np.eye(ndim)

        sft = mu_t.T - tfm.T @ mu_s.T

        transform[dim_group[:, np.newaxis], dim_group] = tfm
        shift[dim_group] = sft

        trc_trf_mul = np.trace(tfm @ src_mul @ tfm)
        trc_trg_mul = np.trace(trg.T @ PT1 @ trg)
        trc_all = np.trace(all_mul @ tfm)

        var = (trc_trg_mul - trc_all) / (N_P * len(dim_group))
        if var <= 0:
            var = cur_var[dim_group] / 2
        new_var[dim_group] = var

        err += (trc_trg_mul - 2 * trc_all + trc_trf_mul) / (
            2 * cur_var[dim_group][0]
        ) + 0.5 * N_P * len(dim_group) * np.log(cur_var[dim_group][0])

    return transform, shift, new_var, err