Source code for cot.assignment

import numpy as np
import torch

def feasibility_check(ind_b, ind_a, yB, yA, Ma, cost):
    # check feasibility
    if yA[ind_a] + yB[ind_b] > cost[ind_b][ind_a] + 1:
                print("assertion failed")
    if Ma[ind_a] == ind_b and yA[ind_a] + yB[ind_b] != cost[ind_b][ind_a]:
                print("assertion failed in assignment")
                print(yA[ind_a] + yB[ind_b])
                print(cost[ind_b][ind_a])

def assignment_check(Ma, Mb):
    # check assignment
    if np.size(np.where(Mb == -1)) != 0:
        print("exist empty assignment")
    if np.size(np.where(Ma == -1)) != 0:
        print("exist empty assignment")
    for ind_b in range(Mb.shape[0]):
        ind_a = Mb[ind_b]
        if ind_a != -1 and ind_b != Ma[ind_a]:
            print("misassignment")
    for ind_a in range(Ma.shape[0]):
        ind_b = Ma[ind_a]
        if ind_b != -1 and ind_a != Mb[ind_b]:
            print("misassignment")

def assignment_check_torch(Ma, Mb):
    # check assignment
    if torch.size(torch.where(Mb == -1)) != 0:
        print("exist empty assignment")
    if torch.size(torch.where(Ma == -1)) != 0:
        print("exist empty assignment")
    for ind_b in range(Mb.shape[0]):
        ind_a = Mb[ind_b]
        if ind_a != -1 and ind_b != Ma[ind_a]:
            print("misassignment")
    for ind_a in range(Ma.shape[0]):
        ind_b = Ma[ind_a]
        if ind_b != -1 and ind_a != Mb[ind_b]:
            print("misassignment")

[docs] def assignment(W, C, eps, seed=0): """ This function sloves an additive approximation of the assignment problem between two discrete distributions and returns assignment plan and cost. This function is a numpy implementation version of the parallelizable combinatorial algorithm [2]_ for assignment problem. Parameters ---------------- W : numpy array, shape (n, n) A n by n cost matrix, each W(i,j) represents the cost between i-th type b and j-th type vertex from the samples of supply and demand distribution. C : float The scale of cost metric. eps : float The additive error of optimal transport distance, the value of :math:`\epsilon` in paper [2]_. Returns ---------------- Mb : numpy array, shape (n,) A 1 by n array, each i represents the index of type a vertex (j-th column in cost matrix, i.e. W[:,j]) that is assigned with ith type b vertex. yA : numpy array, shape (n,) A 1 by n array, each i represents the final dual value of ith type a vertex. yB : numpy array, shape (n,) A 1 by n array, each i represents the final dual value of ith type b vertex. total_cost : float Total cost of the final assignment. References ---------------- .. [2] Lahn, Nathaniel, Sharath Raghvendra, and Kaiyi Zhang. A combinatorial algori-thm for approximating the optimal transport in the parallel and mpc settings. Advances in Neural Information Processing Systems (NeurIPS) 36, 2023 """ n = W.shape[0] S = (3*W//(eps)).astype(int) # cost = (3*W//(eps)).astype(int) yB = np.ones(n, dtype=int) yA = np.zeros(n, dtype=int) Mb = np.ones(n, dtype=int) * -1 Ma = np.ones(n, dtype=int) * -1 f = n iteration = 0 np.random.seed(seed) while f > n*eps/C: ind_b_free = np.where(Mb==-1) ind_S_zero = np.where(S[ind_b_free]==0) # find push edges ind_b_tent_ind, free_S_edge_B_ind_range_lt_inclusive = np.unique(ind_S_zero[0],return_index=True) #get tentative B to push and left index of B range ind_b_tent = ind_b_free[0][ind_b_tent_ind] free_S_edge_B_ind_range_rt_exclusive = np.append(free_S_edge_B_ind_range_lt_inclusive[1:], len(ind_S_zero[0])) #right index of B range free_S_edge_B_ind_rand = np.random.randint(free_S_edge_B_ind_range_lt_inclusive, free_S_edge_B_ind_range_rt_exclusive) #random pick an index from each unique B range ind_a_tent = ind_S_zero[1][free_S_edge_B_ind_rand] #get tentative A to push ind_a_push, tent_ind = np.unique(ind_a_tent, return_index=True) #find exact a to push, and corresponding index ind_b_push = ind_b_tent[tent_ind] #find exact b to push # find release edges ind_release = np.nonzero(Ma[ind_a_push] != -1)[0] edges_released = (Ma[ind_a_push][ind_release], ind_a_push[ind_release]) # update flow f -= len(ind_a_push)-len(ind_release) # release edges Mb[Ma[edges_released[1]]] = -1 # push edges edges_pushed = (ind_b_push, ind_a_push) Ma[ind_a_push] = ind_b_push Mb[ind_b_push] = ind_a_push yA[ind_a_push] -= 1 # find b that not able to be pushed ind_b_not_pushed = np.setdiff1d(ind_b_free[0], ind_b_tent) yB[ind_b_not_pushed] += 1 #update slack S[edges_released] += 1 S[edges_pushed] -= 1 S[:,edges_pushed[1]] += 1 S[ind_b_not_pushed, :] -= 1 iteration += 1 # for ind_b in range(n): # for ind_a in range(n): # feasibility_check(ind_b, ind_a, yB, yA, Ma, cost) ind_a = 0 for ind_b in range(n): if Mb[ind_b] == -1: while Ma[ind_a] != -1: ind_a += 1 Mb[ind_b] = ind_a Ma[ind_a] = ind_b # assignment_check(Ma, Mb) # print("assignment check passed") assignment_cost = 0 for ind_b in range(n): assignment_cost += W[ind_b, Mb[ind_b]] assignment_cost = assignment_cost/n return Mb, yA, yB, assignment_cost
[docs] def assignment_torch(W, C, eps, device, seed=1): """ This function sloves an additive approximation of the assignment problem between two discrete distributions and returns assignment plan and cost. This function is a torch implementation version of the parallelizable combinatorial algorithm [2]_ for assignment problem. Parameters ---------------- W : tensor, shape (n, n) A n by n cost matrix, each W(i,j) represents the cost between i-th type b and j-th type vertex from the samples of supply and demand distribution. C : tensor The scale of cost metric. eps : tensor The additive error of optimal transport distance, the value of :math:`\epsilon` in paper [2]_. device : torch.device The device where the computation will be executed. (e.g. torch.device('cuda:0') for GPU) Returns ---------------- Mb : tensor, shape (n,) A 1 by n array, each i represents the index of type a vertex (j-th column in cost matrix, i.e. W[:,j]) that is assigned with ith type b vertex. yA : tensor, shape (n,) A 1 by n array, each i represents the final dual value of ith type a vertex. yB : tensor, shape (n,) A 1 by n array, each i represents the final dual value of ith type b vertex. total_cost : float Total cost of the final assignment. References ---------------- .. [2] Lahn, Nathaniel, Sharath Raghvendra, and Kaiyi Zhang. A combinatorial algori-thm for approximating the optimal transport in the parallel and mpc settings. Advances in Neural Information Processing Systems (NeurIPS) 36, 2023 """ dtyp = torch.int64 n = W.shape[1] m = W.shape[0] S = (3*W/(eps)).type(dtyp).to(device) # cost = (3*W/(eps)).type(dtyp).to(device) # scaled cost for feasibility validation yB = torch.ones(m, device=device, dtype=dtyp, requires_grad=False) yA = torch.zeros(n, device=device, dtype=dtyp, requires_grad=False) Mb = torch.ones(m, device=device, dtype=dtyp, requires_grad=False) * -1 Ma = torch.ones(n, device=device, dtype=dtyp, requires_grad=False) * -1 f = n iteration = 0 n = W.shape[0] zero = torch.tensor([0], device=device, dtype=dtyp, requires_grad=False)[0] one = torch.tensor([1], device=device, dtype=dtyp, requires_grad=False)[0] m_one = torch.tensor([-1], device=device, dtype=dtyp, requires_grad=False)[0] f_threshold = n*eps/C torch.manual_seed(1) while f > f_threshold: ind_b_free = torch.where(Mb == m_one) ind_S_zero_ind = torch.where(S[ind_b_free] == zero) # find push edges ind_b_tent_ind, free_S_edge_B_ind_range_lt_inclusive = unique(ind_S_zero_ind[0], input_sorted=True) ind_b_tent = ind_b_free[0][ind_b_tent_ind] free_S_edge_B_ind_range_rt_exclusive = torch.cat((free_S_edge_B_ind_range_lt_inclusive[1:], torch.tensor(ind_S_zero_ind[0].shape, device=device, dtype=dtyp, requires_grad=False))) #right index of B range rand_n = torch.rand(ind_b_tent.shape[0], device=device) free_S_edge_B_ind_rand = free_S_edge_B_ind_range_lt_inclusive + ((free_S_edge_B_ind_range_rt_exclusive - free_S_edge_B_ind_range_lt_inclusive)*rand_n).to(dtyp) ind_a_tent = ind_S_zero_ind[1][free_S_edge_B_ind_rand] #get tentative A to push ind_a_push, tent_ind = unique(ind_a_tent, input_sorted=False) #find exact a to push, and corresponding index ind_b_push = ind_b_tent[tent_ind] #find exact b to push # find release edges ind_release = torch.nonzero(Ma[ind_a_push] != -1, as_tuple=True)[0] edges_released = (Ma[ind_a_push][ind_release], ind_a_push[ind_release]) # update flow f -= len(ind_a_push)-len(ind_release) # release edges Mb[Ma[edges_released[1]]] = m_one # push edges edges_pushed = (ind_b_push, ind_a_push) Ma[ind_a_push] = ind_b_push Mb[ind_b_push] = ind_a_push yA[ind_a_push] -= one # find b that not able to be pushed min_slack, _ = torch.min(S[ind_b_free[0],:], dim=1) min_slack_ind = torch.where(min_slack!=0)[0] ind_b_not_pushed = ind_b_free[0][min_slack_ind] yB[ind_b_not_pushed] += min_slack[min_slack_ind] #update slack S[edges_released] += one S[edges_pushed] -= one S[:,edges_pushed[1]] += one S[ind_b_not_pushed, :] -= min_slack[min_slack_ind][:,None] iteration += 1 yA = yA.cpu().detach() yB = yB.cpu().detach() Ma = Ma.cpu().detach() Mb = Mb.cpu().detach() ind_a = 0 for ind_b in range(n): if Mb[ind_b] == -1: while Ma[ind_a] != -1: ind_a += 1 Mb[ind_b] = ind_a Ma[ind_a] = ind_b # assignment_check(Ma, Mb) # check the validity of the assignment assignment_cost = torch.sum(W[torch.arange(0,n,dtype=torch.int64),Mb]) assignment_cost = assignment_cost/n return Mb, yA, yB, assignment_cost
def unique(x, input_sorted = False): """"" Returns the unique elements of array x, and the indices of the first occurrences of the unique values in the original array """"" unique, inverse_ind, unique_count = torch.unique(x, return_inverse=True, return_counts=True) unique_ind = unique_count.cumsum(0) if not unique_ind.size()[0] == 0: unique_ind = torch.cat((torch.tensor([0], dtype=x.dtype, device=x.device), unique_ind[:-1])) if not input_sorted: _, sort2ori_ind = torch.sort(inverse_ind, stable=True) unique_ind = sort2ori_ind[unique_ind] return unique, unique_ind