Skip to content

Commit 79af74a

Browse files
committed
Capturing current development.
1 parent e02db92 commit 79af74a

File tree

3 files changed

+135
-49
lines changed

3 files changed

+135
-49
lines changed

comex/src-oshmem/comex.c

Lines changed: 84 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,9 @@ int comex_acc(int op, void *scale, void *src, void *dst, int bytes,
501501

502502
/* if target is local, perform local accumulate directly */
503503
if (pe == l_state.pe) {
504+
locks_set_internal(pe);
504505
_acc(op, bytes, dst, src, scale);
506+
locks_clear_internal(pe);
505507
return COMEX_SUCCESS;
506508
}
507509

@@ -611,7 +613,13 @@ int comex_accs(int op, void *scale, void *src, int *src_stride,
611613
int comex_accv(int op, void *scale, comex_giov_t *darr, int len,
612614
int proc, comex_group_t group) {
613615
for (int i = 0; i < len; ++i) {
614-
comex_acc(op, scale, darr[i].src[0], darr[i].dst[0], darr[i].bytes, proc, group);
616+
void **src = darr[i].src;
617+
void **dst = darr[i].dst;
618+
int bytes = darr[i].bytes;
619+
int limit = darr[i].count;
620+
for (int j = 0; j < limit; ++j) {
621+
comex_acc(op, scale, src[j], dst[j], bytes, proc, group);
622+
}
615623
}
616624
return COMEX_SUCCESS;
617625
}
@@ -679,8 +687,14 @@ int comex_puts(void *src, int *src_stride, void *dst, int *dst_stride,
679687
int comex_putv(comex_giov_t *darr, int len, int proc, comex_group_t group) {
680688
if (!darr || len <= 0) return COMEX_FAILURE;
681689
for (int i = 0; i < len; ++i) {
682-
int rc = comex_put(darr[i].src[0], darr[i].dst[0], darr[i].bytes, proc, group);
683-
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
690+
void **src = darr[i].src;
691+
void **dst = darr[i].dst;
692+
int bytes = darr[i].bytes;
693+
int limit = darr[i].count;
694+
for (int j = 0; j < limit; ++j) {
695+
int rc = comex_put(src[j], dst[j], bytes, proc, group);
696+
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
697+
}
684698
}
685699
return COMEX_SUCCESS;
686700
}
@@ -727,8 +741,14 @@ int comex_gets(void *src, int *src_stride, void *dst, int *dst_stride,
727741
int comex_getv(comex_giov_t *darr, int len, int proc, comex_group_t group) {
728742
if (!darr || len <= 0) return COMEX_FAILURE;
729743
for (int i = 0; i < len; ++i) {
730-
int rc = comex_get(darr[i].src[0], darr[i].dst[0], darr[i].bytes, proc, group);
731-
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
744+
void **src = darr[i].src;
745+
void **dst = darr[i].dst;
746+
int bytes = darr[i].bytes;
747+
int limit = darr[i].count;
748+
for (int j = 0; j < limit; ++j) {
749+
int rc = comex_get(src[j], dst[j], bytes, proc, group);
750+
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
751+
}
732752
}
733753
return COMEX_SUCCESS;
734754
}
@@ -798,10 +818,16 @@ int comex_nbputv(comex_giov_t *darr, int len, int proc, comex_group_t group,
798818
if (!darr || len <= 0) return COMEX_FAILURE;
799819
comex_request_t last = -1;
800820
for (int i = 0; i < len; ++i) {
801-
comex_request_t h = -1;
802-
int rc = comex_nbput(darr[i].src[0], darr[i].dst[0], darr[i].bytes, proc, group, &h);
803-
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
804-
last = h;
821+
void **src = darr[i].src;
822+
void **dst = darr[i].dst;
823+
int bytes = darr[i].bytes;
824+
int limit = darr[i].count;
825+
for (int j = 0; j < limit; ++j) {
826+
comex_request_t h = -1;
827+
int rc = comex_nbput(src[j], dst[j], bytes, proc, group, &h);
828+
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
829+
last = h;
830+
}
805831
}
806832
if (nb_handle) *nb_handle = last;
807833
return COMEX_SUCCESS;
@@ -876,10 +902,16 @@ int comex_nbgetv(comex_giov_t *darr, int len, int proc, comex_group_t group,
876902
if (!darr || len <= 0) return COMEX_FAILURE;
877903
comex_request_t last = -1;
878904
for (int i = 0; i < len; ++i) {
879-
comex_request_t h = -1;
880-
int rc = comex_nbget(darr[i].src[0], darr[i].dst[0], darr[i].bytes, proc, group, &h);
881-
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
882-
last = h;
905+
void **src = darr[i].src;
906+
void **dst = darr[i].dst;
907+
int bytes = darr[i].bytes;
908+
int limit = darr[i].count;
909+
for (int j = 0; j < limit; ++j) {
910+
comex_request_t h = -1;
911+
int rc = comex_nbget(src[j], dst[j], bytes, proc, group, &h);
912+
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
913+
last = h;
914+
}
883915
}
884916
if (nb_handle) *nb_handle = last;
885917
return COMEX_SUCCESS;
@@ -927,7 +959,9 @@ int comex_nbacc(int op, void *scale, void *src, void *dst, int bytes,
927959
if (pe == l_state.pe) {
928960
/* local target: perform synchronous accumulate now and leave aux NULL
929961
* so comex_wait will simply release the NB entry. */
962+
locks_set_internal(pe);
930963
_acc(op, bytes, dst, src, aux->scale);
964+
locks_clear_internal(pe);
931965
free(aux->scale);
932966
free(aux);
933967
e->aux = NULL;
@@ -940,10 +974,10 @@ int comex_nbacc(int op, void *scale, void *src, void *dst, int bytes,
940974
aux->tmp = malloc((size_t)bytes);
941975
if (!aux->tmp) { free(aux->scale); free(aux); comex_nb_release(idx); return COMEX_FAILURE; }
942976
e->aux = aux;
943-
/* blocking get the remote destination into tmp (shmem_getmem blocks
944-
* until the data is available locally; an immediate shmem_quiet() is
945-
* therefore unnecessary). */
946-
shmem_getmem(aux->tmp, dst, bytes, pe);
977+
/* blocking get the remote destination into tmp (shmem_getmem blocks
978+
* until the data is available locally; an immediate shmem_quiet() is
979+
* therefore unnecessary). */
980+
shmem_getmem(aux->tmp, dst, bytes, pe);
947981
/* perform local accumulate into tmp */
948982
_acc(op, bytes, aux->tmp, src, aux->scale);
949983
/* issue non-blocking put of updated tmp back to remote destination */
@@ -1017,10 +1051,16 @@ int comex_nbaccv(int op, void *scale, comex_giov_t *darr, int len,
10171051
int proc, comex_group_t group, comex_request_t* nb_handle) {
10181052
comex_request_t last = -1;
10191053
for (int i = 0; i < len; ++i) {
1020-
comex_request_t h = -1;
1021-
int rc = comex_nbacc(op, scale, darr[i].src[0], darr[i].dst[0], darr[i].bytes, proc, group, &h);
1022-
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
1023-
last = h;
1054+
void **src = darr[i].src;
1055+
void **dst = darr[i].dst;
1056+
int bytes = darr[i].bytes;
1057+
int limit = darr[i].count;
1058+
for (int j = 0; j < limit; ++j) {
1059+
comex_request_t h = -1;
1060+
int rc = comex_nbacc(op, scale, src[j], dst[j], bytes, proc, group, &h);
1061+
if (rc != COMEX_SUCCESS) return COMEX_FAILURE;
1062+
last = h;
1063+
}
10241064
}
10251065
if (nb_handle) *nb_handle = last;
10261066
return COMEX_SUCCESS;
@@ -1032,7 +1072,29 @@ int comex_malloc_mem_dev(void **ptr_arr, size_t bytes, comex_group_t group, cons
10321072
}
10331073

10341074
int comex_rmw(int op, void *ploc, void *prem, int extra, int proc, comex_group_t group) {
1035-
(void)op; (void)ploc; (void)prem; (void)extra; (void)proc; (void)group;
1075+
int world_proc;
1076+
if (comex_group_translate_world(group, proc, &world_proc) != COMEX_SUCCESS) {
1077+
comex_error("comex_rmw: group translation failed", -1);
1078+
return COMEX_FAILURE;
1079+
}
1080+
1081+
switch (op) {
1082+
case COMEX_FETCH_AND_ADD:
1083+
*(int*)ploc = shmem_int_fadd((int*)prem, extra, world_proc);
1084+
break;
1085+
case COMEX_FETCH_AND_ADD_LONG:
1086+
*(long*)ploc = shmem_long_fadd((long*)prem, (long)extra, world_proc);
1087+
break;
1088+
case COMEX_SWAP:
1089+
*(int*)ploc = shmem_int_swap((int*)prem, *(int*)ploc, world_proc);
1090+
break;
1091+
case COMEX_SWAP_LONG:
1092+
*(long*)ploc = shmem_long_swap((long*)prem, *(long*)ploc, world_proc);
1093+
break;
1094+
default:
1095+
comex_error("comex_rmw: unknown op", op);
1096+
return COMEX_FAILURE;
1097+
}
10361098
return COMEX_SUCCESS;
10371099
}
10381100

comex/src-oshmem/locks.c

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,49 @@
44
#include <stdlib.h>
55
#include <shmem.h>
66

7-
/* Internal lock storage (symmetric) */
7+
8+
/* Internal lock storage (regular memory, not symmetric) */
89
static long *g_locks = NULL;
9-
static int g_num_mutexes = 0;
10-
static int g_total_per_pe = 0;
10+
static int g_locks_npes = 0;
11+
static int *g_locks_map = NULL;
1112

1213
int comex_create_mutexes(int num) {
13-
if (num < 0) return COMEX_FAILURE;
14-
g_num_mutexes = num;
15-
g_total_per_pe = l_state.n_pes + g_num_mutexes;
16-
size_t total = (size_t)l_state.n_pes * (size_t)g_total_per_pe;
17-
g_locks = (long*)shmem_malloc(sizeof(long) * total);
18-
if (!g_locks) return COMEX_FAILURE;
19-
for (size_t i = 0; i < total; ++i) g_locks[i] = (long)i;
20-
shmem_barrier_all();
14+
int npes = l_state.n_pes;
15+
int pe = l_state.pe;
16+
g_locks_npes = npes;
17+
if (g_locks) {
18+
shmem_free(g_locks);
19+
g_locks = NULL;
20+
}
21+
if (g_locks_map) {
22+
free(g_locks_map);
23+
g_locks_map = NULL;
24+
}
25+
// Step 1: create and sum num_mutexes
26+
int *num_mutexes = (int*)shmem_malloc(sizeof(int) * npes);
27+
if (!num_mutexes) return COMEX_FAILURE;
28+
for (int i = 0; i < npes; ++i) num_mutexes[i] = 0;
29+
num_mutexes[pe] = num;
30+
int *sum_mutexes = (int*)shmem_malloc(sizeof(int) * npes);
31+
if (!sum_mutexes) { shmem_free(num_mutexes); return COMEX_FAILURE; }
32+
shmem_int_sum_to_all(sum_mutexes, num_mutexes, npes, 0, 0, npes, NULL, NULL);
33+
// Step 2: allocate g_locks_map and fill as described
34+
g_locks_map = (int*)malloc(sizeof(int) * (npes + 1));
35+
if (!g_locks_map) { shmem_free(num_mutexes); shmem_free(sum_mutexes); return COMEX_FAILURE; }
36+
g_locks_map[0] = npes;
37+
for (int i = 1; i <= npes; ++i) {
38+
g_locks_map[i] = g_locks_map[i-1] + sum_mutexes[i-1];
39+
}
40+
shmem_free(num_mutexes);
41+
shmem_free(sum_mutexes);
42+
if (g_locks) {
43+
shmem_free(g_locks);
44+
g_locks = NULL;
45+
}
46+
size_t locks_len = g_locks_map[npes];
47+
g_locks = (long*)shmem_malloc(sizeof(long) * locks_len);
48+
if (!g_locks) { free(g_locks_map); g_locks_map = NULL; return COMEX_FAILURE; }
49+
for (size_t i = 0; i < locks_len; ++i) g_locks[i] = (long)i;
2150
return COMEX_SUCCESS;
2251
}
2352

@@ -26,42 +55,38 @@ int comex_destroy_mutexes(void) {
2655
shmem_free(g_locks);
2756
g_locks = NULL;
2857
}
29-
g_num_mutexes = 0;
30-
shmem_barrier_all();
58+
if (g_locks_map) {
59+
free(g_locks_map);
60+
g_locks_map = NULL;
61+
}
62+
g_locks_npes = 0;
3163
return COMEX_SUCCESS;
3264
}
3365

3466
int comex_lock(int mutex, int proc) {
35-
if (g_total_per_pe <= 0) return COMEX_FAILURE;
3667
if (proc < 0 || proc >= l_state.n_pes) return COMEX_FAILURE;
37-
if (mutex < 0 || mutex >= g_num_mutexes) return COMEX_FAILURE;
38-
long imutex = (long)(proc + 1) * (long)l_state.n_pes + (long)mutex;
39-
if ((size_t)imutex >= (size_t)g_total_per_pe) return COMEX_FAILURE;
68+
if (mutex < 0 || mutex >= (g_locks_map[proc+1] - g_locks_map[proc])) return COMEX_FAILURE;
69+
long imutex = g_locks_map[proc] + mutex;
4070
shmem_set_lock(&g_locks[imutex]);
4171
return COMEX_SUCCESS;
4272
}
4373

4474
int comex_unlock(int mutex, int proc) {
45-
if (g_total_per_pe <= 0) return COMEX_FAILURE;
4675
if (proc < 0 || proc >= l_state.n_pes) return COMEX_FAILURE;
47-
if (mutex < 0 || mutex >= g_num_mutexes) return COMEX_FAILURE;
48-
long imutex = (long)(proc + 1) * (long)l_state.n_pes + (long)mutex;
49-
if ((size_t)imutex >= (size_t)g_total_per_pe) return COMEX_FAILURE;
76+
if (mutex < 0 || mutex >= (g_locks_map[proc+1] - g_locks_map[proc])) return COMEX_FAILURE;
77+
long imutex = g_locks_map[proc] + mutex;
5078
shmem_clear_lock(&g_locks[imutex]);
5179
return COMEX_SUCCESS;
5280
}
5381

5482
void locks_set_internal(int pe) {
5583
if (!g_locks) return;
5684
if (pe < 0 || pe >= l_state.n_pes) return;
57-
/* reserved internal slot is the first slot for each PE */
58-
size_t idx = (size_t)pe * (size_t)g_total_per_pe + 0;
59-
shmem_set_lock(&g_locks[idx]);
85+
shmem_set_lock(&g_locks[pe]);
6086
}
6187

6288
void locks_clear_internal(int pe) {
6389
if (!g_locks) return;
6490
if (pe < 0 || pe >= l_state.n_pes) return;
65-
size_t idx = (size_t)pe * (size_t)g_total_per_pe + 0;
66-
shmem_clear_lock(&g_locks[idx]);
91+
shmem_clear_lock(&g_locks[pe]);
6792
}

global/src/global.nalg.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,6 @@ int local_sync_begin,local_sync_end;
836836
da = (double*)ptr_a;
837837
db = (double*)ptr_b;
838838
dc = (double*)ptr_c;
839-
printf("p[%d] (add) adding elements\n",pnga_nodeid());
840839
for(i=0; i<elems; i++)
841840
dc[i] = *(double*)alpha *da[i] +
842841
*(double*)beta * db[i];

0 commit comments

Comments
 (0)