.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "auto_paper_results/funcs_single_sim.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note Click :ref:`here ` to download the full example code .. rst-class:: sphx-glr-example-title .. _sphx_glr_auto_paper_results_funcs_single_sim.py: Main functions ============== Various funcctions that are used when running single_simulation.py .. GENERATED FROM PYTHON SOURCE LINES 9-16 .. code-block:: default import numpy as np import mne_connectivity from scipy import signal from scipy.fft import fft, ifft .. GENERATED FROM PYTHON SOURCE LINES 17-50 .. code-block:: default def select_sources(G, src, N_loc): """ Select N_loc pairs of active sources, given the leadfield (G) and the source locations (src), so that the distance between the two sources is grater than 4 cm (0.04 m) and the intensity at sensor level is similar (i.e. the ratio of the norm of the corresponding leadfield columns is in (9/10, 10/9)) Parameters: G: array, shape (N_sensors, N_sources) leadfeld matrix src: array, shape (N_sources,3) sources locations in the source space N_loc: int number of pairs of sources to be generated Returns: sample: array, shape (N_loc,2) index of the selected sources in the source space """ sample = np.array([[int(0)]*2]*N_loc) for k in range(N_loc): ratio = np.inf d = 0 while ratio < 9/10 or ratio > 10/9 or d < 0.04: sample[k] = np.random.permutation(G.shape[1])[0: 2] norm1 = np.linalg.norm(np.transpose(G)[sample[k, 0]]) norm2 = np.linalg.norm(np.transpose(G)[sample[k, 1]]) ratio = norm1/norm2 d = np.linalg.norm(src[sample[k, 0]]-src[sample[k, 1]]) return sample .. GENERATED FROM PYTHON SOURCE LINES 51-95 .. code-block:: default def gen_ar_model(N_act, P, Sigma): """ Generate an MVAR model with directional coupling from source 1 to source 2, the third source, if present, is uncorrelated to the the first two Parameters: N_act: 2, 3 dimension of the model (i.e. number of time courses to be simulated) P: int order of the model Sigma: float (in (0.1,1)) variance of the non zero entries of the model Returns: model: dictionary Dictionary containing:\n * the matrices that define the model * N_act * P """ lambdamax = 10 I = np.eye(N_act) vect = (np.array([int(1)]), np.array([int(0)])) index = np.where(I == 1) vect = (np.concatenate((vect[0], index[0])), np.concatenate((vect[1], index[1]))) while lambdamax < 0.9 or lambdamax >= 1: Arsig = np.array([[], []]) for k in range(P): aloc = np.zeros([N_act, N_act]) for s in range(vect[0].shape[0]): aloc[vect[0][s], vect[1][s]] = np.random.randn(1)[0]*Sigma Arsig = np.concatenate((Arsig, aloc), axis=1) E = np.eye(N_act*P) AA = np.concatenate((Arsig, E[0: -N_act, :])) Lambda = np.linalg.eig(AA)[0] lambdamax = max(abs(Lambda)) model = {'Arsig': Arsig, 'N_act': N_act, 'P': P} return model .. GENERATED FROM PYTHON SOURCE LINES 96-134 .. code-block:: default def gen_ar_series(AR_mod, T): """ Generate an MVAR time courses Parameters: AR_mod: dictionary (output of gen_AR_model) Dictionary containing:\n * the matrices that define the model * N_act * P T: int number of time points of the generated time courses Returns: X: array, shape (N_act, T) the generated time courses AR_mod: dictionary Dictionary containing:\n * the matrices that define the model * N_act * P """ Sigma = 1 N0 = 1000 N_act = AR_mod['N_act'] P = AR_mod['P'] Arsig = AR_mod['Arsig'] x = np.random.randn(N_act, T+N0)*Sigma AR_mod['noise'] = x[:, P:].copy() y = x.copy() for k in range(P, T+N0): yloc = np.concatenate(np.flip(y[:, k-P: k], axis=1).T, axis=None) y[:, k] = Arsig.dot(yloc) + x[:, k] X = y[:, N0:].copy() return X, AR_mod .. GENERATED FROM PYTHON SOURCE LINES 135-179 .. code-block:: default def gen_background_tcs(P, N_bg, T): """ Generate background noise time courses following univariate AR models Parameters: P: int order of the AR model N_bg: int number of time coursed to generate T: int number of time points of the generate time courses Returns: bg_tcs: array, shape (N_bg, T) the generated time courses """ bg_tcs = np.zeros((N_bg, T)) for i_bg in range(N_bg): # genarate the model lambdamax = 10 while lambdamax < 0.9 or lambdamax >= 1: Arsig = np.zeros([1, 0]) for k in range(P): aloc = np.random.randn(1, 1) Arsig = np.concatenate((Arsig, aloc), axis=1) E = np.eye(P) AA = np.concatenate((Arsig, E[0: -1, :])) Lambda = np.linalg.eig(AA)[0] lambdamax = max(abs(Lambda)) # simulate tc Sigma = 1 N0 = 1000 x = np.random.randn(1, T+N0)*Sigma y = x.copy() for k in range(P, T+N0): yloc = np.concatenate(np.flip(y[:, k-P:k], axis=1).T, axis=None) y[:, k] = Arsig.dot(yloc) + x[:, k] bg_tcs[i_bg, :] = y[:, N0:].copy() return bg_tcs .. GENERATED FROM PYTHON SOURCE LINES 180-209 .. code-block:: default def gen_patches_sources(cortico_dist, patch_radius, seed_loc): """ Define sources composing a patch with specific distance from the center Parameters: cortico_dist: array, shape (N_source, N_source) cortico distances between each pair of sources in the source space patch_radius: float radius of the patch seed_loc: array, touple, list, shape (2,) index of the seed sources Returns: p1_locs: array, shape (N_p1,) the index of the sutces within the first patch p1_locs: array, shape (N_p2,) the index of the sutces within the second patch """ tmp_id_0 = np.argsort(cortico_dist[:, seed_loc[0]]) n_sources_0 = len(np.where(cortico_dist[:, seed_loc[0]] < patch_radius)[0]) tmp_id_1 = np.argsort(cortico_dist[:, seed_loc[1]]) n_sources_1 = len(np.where(cortico_dist[:, seed_loc[1]] < patch_radius)[0]) p1_locs = tmp_id_0[0: n_sources_0] p2_locs = tmp_id_1[0: n_sources_1] return p1_locs, p2_locs .. GENERATED FROM PYTHON SOURCE LINES 210-319 .. code-block:: default def gen_coherent_patches(seed_tc, p1_locs, p2_locs, c, i_c, nperseg, nfft, fs, fmin, fmax): """ Generate time courses to be assigned to the sources within the patches. The patches are Gaussian and have specific intracoherence level Parameters: seed_tc: array, shape (2, T) time courses of the seeds of each patch p1_locs: array, shpae (N_p1) indeces of the sources within patch 1 p2_locs: array, shpae (N_p2) indeces of the sources within patch 2 c: 1, 0.5, 2 coeherence level i_c: not used anymore nperseg: int lenght of the epochs for the computation of the coherence nfft: int number of frequencies fs: float sampling frequency fmin: float minimum frequency for the computation of coherence fmax: float maximum frequency for the computation of coherence Returns: p1_tcs: array, shape (N_p1,T) time courses of the sources in patch 1 p2_tcs: array, shape (N_p2,T) time courses of the sources in patch 2 """ # define gaussian window std_dev1 = (len(p1_locs)-1)/(np.sqrt(-2*np.log(0.4))) # standard deviation # patch 1 w1 = signal.gaussian(len(p1_locs)*2-1, std_dev1) # weights patch 1 w1 = w1[(len(p1_locs)-1):] std_dev2 = (len(p2_locs)-1)/(np.sqrt(-2*np.log(0.4))) # standard deviation # patch 2 w2 = signal.gaussian(len(p2_locs)*2-1, std_dev2) # weights patch 1 w2 = w2[(len(p2_locs)-1):] T = seed_tc.shape[1] if c == 1: p1_tcs = np.tile(seed_tc[0, :].reshape((1, -1)), [len(p1_locs), 1]) p2_tcs = np.tile(seed_tc[1, :].reshape((1, -1)), [len(p2_locs), 1]) else: p1_tcs = seed_tc[0, :].reshape((1, -1)) p2_tcs = seed_tc[1, :].reshape((1, -1)) if c == 0.5: rate1 = 100 rate2 = 100 elif c == 0.2: rate1 = 500 rate2 = 300 else: print('wrong coherence level, accepted values are: 1, 0.5, 0.2') # grow patch 1 for i_source in range(1, len(p1_locs)): mean_coh = np.array([0, 1]) while (np.min(mean_coh) < c-0.2 or np.max(mean_coh) > c+0.2): new_tc = fft(seed_tc[0, :].reshape((1, -1))) +\ np.random.randn(1, T)*(np.random.randn(1)*rate1+rate2) new_tc = np.real(ifft(new_tc))*w1[i_source] *\ (np.linalg.norm(seed_tc[0, :]) / np.linalg.norm(np.real(ifft(new_tc)))) f, conn = signal.coherence(p1_tcs, new_tc, fs=fs, window='hann', nperseg=nperseg, noverlap=nperseg//2, nfft=nperseg, detrend='constant', axis=-1) i_f_in = np.where((f >= fmin) & (f <= fmax))[0] mean_coh = abs(conn[:, i_f_in]).mean(axis=-1).copy() p1_tcs = np.append(p1_tcs, new_tc, axis=0) # grow patch 2 for i_source in range(1, len(p2_locs)): mean_coh = np.array([0, 1]) while (np.min(mean_coh) < c-0.2 or np.max(mean_coh) > c+0.2): new_tc = fft(seed_tc[1, :].reshape((1, -1))) +\ np.random.randn(1, T) * (np.random.randn(1) * rate1+rate2) new_tc = np.real(ifft(new_tc)) * w2[i_source] *\ (np.linalg.norm(seed_tc[1, :]) / np.linalg.norm(np.real(ifft(new_tc)))) f, conn = signal.coherence(p2_tcs, new_tc, fs=fs, window='hann', nperseg=nperseg, noverlap=nperseg//2, nfft=nperseg, detrend='constant', axis=-1) i_f_in = np.where((f >= fmin) & (f <= fmax))[0] mean_coh = abs(conn[:, i_f_in]).mean(axis=-1).copy() p2_tcs = np.append(p2_tcs, new_tc, axis=0) p1_tcs_norm = np.linalg.norm(p1_tcs, ord='fro') p2_tcs_norm = np.linalg.norm(p2_tcs, ord='fro') return p1_tcs/p1_tcs_norm, p2_tcs/p2_tcs_norm .. GENERATED FROM PYTHON SOURCE LINES 320-354 .. code-block:: default def err_X(Lambda, X, Y, G, GGt): """ Error in estimating X using the regularization parameter Lambda _ 2 err_X = \ ||X_Lambda(t)-X(t)|| /_t Parameters: Lambda: float (>0) regularization parameter X: array, shape (N_source,T) activity of N_source sources Y: array, shape (N_sensor,T) Sensor level regordings G: array, shape (N_sensor,N_source) leadfiel matrix GGt: array, shape (N_sensor, N_sensor) GGt = G*G^T Returns: value: float error """ T = X.shape[1] if Lambda < 0: value = np.inf else: I = np.eye(G.shape[0]) W_tik = G.T.dot(np.linalg.inv(GGt+Lambda*I)) value = (1/(T*G.shape[1]))*np.linalg.norm(W_tik.dot(Y)-X, ord='fro')**2 return value .. GENERATED FROM PYTHON SOURCE LINES 355-487 .. code-block:: default def auc(Lambda, method, G, GGt, Y, p1_sources, p2_sources, fmin, fmax, PN_matrix, fs, nperseg): """ Compute Area Under the Curve (AUC) Parameters: Lambda: float (>0) regularization parameter method: string | list/tuple of strings connectivity metrics for which compute the AUC. Valid methods are 'cpsd', 'coh', 'cohy', 'imcoh', 'plv', 'ciplv', 'ppc', 'pli', 'pli2_unbiased', 'wpli', 'wpli2_debiased' G: array, shape (N_sensor,N_source) leadfiel matrix GGt: array, shape (N_sensor, N_sensor) GGt = G*G^T Y: array, shape (N_sensor,T) Sensor level recordings p1_sources: array, list, tuple, shape (N_p1,) indeces of the sources within patch 1 p2_sources: array, list, tuple, shape (N_p2,) indeces of the sources within patch 2 fmin: float minimum frequency for the computation of coherence fmax: float maximum frequency for the computation of coherence PN_matrix: array, shape (N_p1, N_dense-N_p1) matrix indicating which sources are connected (i.e. sources within patch 1 with sources within patch 2) fs: float sampling frequency nperseg: int lenght of the epochs for the computation of the coherence Returns: AUC_value: float Area under the curve TPF_all: array, shape (number of connectivity metrics, 20) True positive fraction for each connectivity metric and for each threshold level FPF_all: array, shape (number of connectivity metrics, 20) False positive fraction for each connectivity metric and for each threshold level """ if Lambda < 0: value = 0 else: if type(method) is str: method = [method] nfft = nperseg # number of frequencies M = G.shape[0] N = G.shape[1] T = Y.shape[1] X_lam = G.T.dot(np.linalg.inv(GGt+Lambda*np.eye(M))).dot(Y) Conn_lam = [np.zeros((len(p1_sources), N))]*len(method) if 'cpsd' in method: method_conn = method.copy() method_conn.remove('cpsd') conn_cpsd = np.zeros((len(p1_sources), N)) for k in range(len(p1_sources)): f, Connlam_row = signal.csd(X_lam[p1_sources[k], :], X_lam, fs=fs, window='hann', nperseg=nperseg, noverlap=nperseg // 2, nfft=nfft, detrend='constant', return_onesided=True, scaling='density', axis=-1) f_in = np.intersect1d(np.where(f >= fmin)[0], np.where(f <= fmax)[0]) conn_cpsd[k, :] = np.mean(abs(Connlam_row[:, f_in]), axis=-1) Conn_lam[method.index('cpsd')] = np.delete(conn_cpsd, p1_sources, axis=-1) else: method_conn = method if len(method_conn) > 0: noverlap = nperseg//2 nepo = T//(nperseg-noverlap)-1 X_lam_re = np.zeros((nepo, N, nperseg)) for i_epo in range(nepo): X_lam_re[i_epo, :, :] = X_lam[:, (nperseg-noverlap)*i_epo: (nperseg-noverlap)*i_epo+nperseg] indices = mne_connectivity.seed_target_indices(p1_sources, np.arange(N)) Conn_object = mne_connectivity.spectral_connectivity_epochs( X_lam_re, method=method_conn, mode='fourier', indices=indices, sfreq=fs, fmin=fmin, fmax=fmax, faverage=False, verbose=False) for i_conn in range(len(method_conn)): Conn = Conn_object[i_conn].xarray.data Conn_lam[method.index(method_conn[i_conn])] =\ np.reshape(abs(Conn).mean(axis=-1).copy(), (len(p1_sources), N)) Conn_lam[method.index(method_conn[i_conn])] =\ np.delete(Conn_lam[method.index(method_conn[i_conn])], p1_sources, axis=-1) AUC_value = np.zeros((len(method))) Alpha = np.linspace(np.finfo(float).eps, 1, 20) Alpha = np.flip(Alpha) TPF_all = np.zeros((len(method), len(Alpha))) FPF_all = np.zeros((len(method), len(Alpha))) for i_conn in range(len(method)): TPF = np.zeros(len(Alpha)) FPF = np.zeros(len(Alpha)) for i_alp in range(len(Alpha)): aux = np.where(abs(Conn_lam[i_conn]) >= Alpha[i_alp] * np.max(abs(Conn_lam[i_conn])), abs(Conn_lam[i_conn]), 0) PN_evaluate = np.where(aux == 0, aux, 1) TP = np.count_nonzero(PN_matrix == np.where(PN_evaluate == 1, PN_evaluate, -1)) FP = np.count_nonzero(PN_evaluate)-TP FPF[i_alp] = FP / (PN_matrix.shape[0] * PN_matrix.shape[1] - np.count_nonzero(PN_matrix)) TPF[i_alp] = TP / np.count_nonzero(PN_matrix) TPF_all[i_conn, :] = TPF FPF_all[i_conn, :] = FPF AUC_value[i_conn] = -np.trapz(TPF, FPF) return(AUC_value, TPF_all, FPF_all) .. rst-class:: sphx-glr-timing **Total running time of the script:** ( 0 minutes 0.000 seconds) .. _sphx_glr_download_auto_paper_results_funcs_single_sim.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: funcs_single_sim.py ` .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: funcs_single_sim.ipynb ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_