From 74fd61cba12fb6592a832903f3d23601ebaf3ec9 Mon Sep 17 00:00:00 2001 From: domfournier Date: Mon, 8 Sep 2025 21:25:27 -0700 Subject: [PATCH 1/4] Try halving the threads --- simpeg/dask/electromagnetics/time_domain/simulation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simpeg/dask/electromagnetics/time_domain/simulation.py b/simpeg/dask/electromagnetics/time_domain/simulation.py index 9905a1cc03..903e0b88fc 100644 --- a/simpeg/dask/electromagnetics/time_domain/simulation.py +++ b/simpeg/dask/electromagnetics/time_domain/simulation.py @@ -71,7 +71,7 @@ def getSourceTerm(self, tInd): source_list = self.survey.source_list source_block = np.array_split( - np.arange(len(source_list)), self.n_threads(client=client) + np.arange(len(source_list)), int(self.n_threads(client=client) / 2) ) if client: From 0c959215b8441e023ba4005d3862f96c607c3ee1 Mon Sep 17 00:00:00 2001 From: domfournier Date: Wed, 10 Sep 2025 14:50:37 -0700 Subject: [PATCH 2/4] TRy with a flat list of sources --- .../time_domain/simulation.py | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/simpeg/dask/electromagnetics/time_domain/simulation.py b/simpeg/dask/electromagnetics/time_domain/simulation.py index 903e0b88fc..f700c5769d 100644 --- a/simpeg/dask/electromagnetics/time_domain/simulation.py +++ b/simpeg/dask/electromagnetics/time_domain/simulation.py @@ -70,9 +70,6 @@ def getSourceTerm(self, tInd): sim = self source_list = self.survey.source_list - source_block = np.array_split( - np.arange(len(source_list)), int(self.n_threads(client=client) / 2) - ) if client: sim = client.scatter(self, workers=self.worker) @@ -82,22 +79,19 @@ def getSourceTerm(self, tInd): sim = self block_compute = [] - for block in source_block: + for source in source_list: if client: block_compute.append( client.submit( source_evaluation, sim, - block, self.times[tInd], - source_list, + source, workers=self.worker, ) ) else: - block_compute.append( - delayed_source_eval(self, block, self.times[tInd], source_list) - ) + block_compute.append(delayed_source_eval(self, self.times[tInd], source)) if client: blocks = client.gather(block_compute) @@ -283,12 +277,12 @@ def field_projection(field_array, src_list, array_ind, time_ind, func): return new_array -def source_evaluation(simulation, indices, time_channel, sources): +def source_evaluation(simulation, time_channel, source): s_m, s_e = [], [] - for ind in indices: - sm, se = sources[ind].eval(simulation, time_channel) - s_m.append(sm) - s_e.append(se) + + sm, se = source.eval(simulation, time_channel) + s_m.append(sm) + s_e.append(se) return s_m, s_e From 8bce08b18bd9609e198039885a3c751475080db9 Mon Sep 17 00:00:00 2001 From: domfournier Date: Wed, 10 Sep 2025 16:09:02 -0700 Subject: [PATCH 3/4] Temp remove of parallel process --- .../time_domain/simulation.py | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/simpeg/dask/electromagnetics/time_domain/simulation.py b/simpeg/dask/electromagnetics/time_domain/simulation.py index f700c5769d..cc6db014f4 100644 --- a/simpeg/dask/electromagnetics/time_domain/simulation.py +++ b/simpeg/dask/electromagnetics/time_domain/simulation.py @@ -62,41 +62,42 @@ def getSourceTerm(self, tInd): elif getattr(self, "_stashed_sources", None) is None: self._stashed_sources = {} - try: - client = get_client() - sim = client.scatter(self, workers=self.worker) - except ValueError: - client = None - sim = self - - source_list = self.survey.source_list - - if client: - sim = client.scatter(self, workers=self.worker) - source_list = client.scatter(source_list, workers=self.worker) - else: - delayed_source_eval = delayed(source_evaluation) - sim = self - - block_compute = [] - for source in source_list: - if client: - block_compute.append( - client.submit( - source_evaluation, - sim, - self.times[tInd], - source, - workers=self.worker, - ) - ) - else: - block_compute.append(delayed_source_eval(self, self.times[tInd], source)) - - if client: - blocks = client.gather(block_compute) - else: - blocks = dask.compute(block_compute)[0] + # try: + # client = get_client() + # sim = client.scatter(self, workers=self.worker) + # except ValueError: + # client = None + # sim = self + # + # source_list = self.survey.source_list + # + # if client: + # sim = client.scatter(self, workers=self.worker) + # source_list = client.scatter(source_list, workers=self.worker) + # else: + # delayed_source_eval = delayed(source_evaluation) + # sim = self + + blocks = [] + for source in self.survey.source_list: + blocks.append(source_evaluation(self, self.times[tInd], source)) + # if client: + # block_compute.append( + # client.submit( + # source_evaluation, + # sim, + # self.times[tInd], + # source, + # workers=self.worker, + # ) + # ) + # else: + # block_compute.append(delayed_source_eval(self, self.times[tInd], source)) + # + # if client: + # blocks = client.gather(block_compute) + # else: + # blocks = dask.compute(block_compute)[0] s_m, s_e = [], [] for block in blocks: From c9c2188d47c49145d0a9f0486719ce0c1e268d26 Mon Sep 17 00:00:00 2001 From: domfournier Date: Thu, 11 Sep 2025 20:32:06 -0700 Subject: [PATCH 4/4] Clean out comments --- .../time_domain/simulation.py | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/simpeg/dask/electromagnetics/time_domain/simulation.py b/simpeg/dask/electromagnetics/time_domain/simulation.py index cc6db014f4..19315ff3b0 100644 --- a/simpeg/dask/electromagnetics/time_domain/simulation.py +++ b/simpeg/dask/electromagnetics/time_domain/simulation.py @@ -62,42 +62,9 @@ def getSourceTerm(self, tInd): elif getattr(self, "_stashed_sources", None) is None: self._stashed_sources = {} - # try: - # client = get_client() - # sim = client.scatter(self, workers=self.worker) - # except ValueError: - # client = None - # sim = self - # - # source_list = self.survey.source_list - # - # if client: - # sim = client.scatter(self, workers=self.worker) - # source_list = client.scatter(source_list, workers=self.worker) - # else: - # delayed_source_eval = delayed(source_evaluation) - # sim = self - blocks = [] for source in self.survey.source_list: blocks.append(source_evaluation(self, self.times[tInd], source)) - # if client: - # block_compute.append( - # client.submit( - # source_evaluation, - # sim, - # self.times[tInd], - # source, - # workers=self.worker, - # ) - # ) - # else: - # block_compute.append(delayed_source_eval(self, self.times[tInd], source)) - # - # if client: - # blocks = client.gather(block_compute) - # else: - # blocks = dask.compute(block_compute)[0] s_m, s_e = [], [] for block in blocks: