Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ SEXP bmerge(SEXP idt, SEXP xdt, SEXP icolsArg, SEXP xcolsArg,
double dquickselect(double *x, int n);
double iquickselect(int *x, int n);
double i64quickselect(int64_t *x, int n);
double dfloyd_rivest(double *x, int n);
double ifloyd_rivest(int *x, int n);
double i64floyd_rivest(int64_t *x, int n);

// fread.c
double wallclock(void);
Expand Down
69 changes: 47 additions & 22 deletions src/gsumm.c
Original file line number Diff line number Diff line change
Expand Up @@ -880,43 +880,68 @@ SEXP gmedian(SEXP x, SEXP narmArg) {
const bool nosubset = irowslen==-1;
switch(TYPEOF(x)) {
case REALSXP: {
double *subd = REAL(PROTECT(allocVector(REALSXP, maxgrpn))); // allocate once upfront and reuse
int64_t *xi64 = (int64_t *)REAL(x);
double *xd = REAL(x);
for (int i=0; i<ngrp; ++i) {
int thisgrpsize = grpsize[i], nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
else subd[j-nacount] = xd[k];
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
{
double *thread_subd = malloc(maxgrpn * sizeof(double));
if (!thread_subd) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
#pragma omp for
for (int i=0; i<ngrp; ++i) {
int thisgrpsize = grpsize[i], nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
else thread_subd[j-nacount] = xd[k];
}
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
if (nacount && !narm) {
ansd[i] = NA_REAL;
} else {
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
ansd[i] = isInt64 ?
(thisgrpsize > 100 ? i64floyd_rivest((int64_t *)thread_subd, thisgrpsize) : i64quickselect((int64_t *)thread_subd, thisgrpsize)) :
(thisgrpsize > 100 ? dfloyd_rivest(thread_subd, thisgrpsize) : dquickselect(thread_subd, thisgrpsize));
}
}
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
ansd[i] = (nacount && !narm) ? NA_REAL : (isInt64 ? i64quickselect((void *)subd, thisgrpsize) : dquickselect(subd, thisgrpsize));
free(thread_subd);
}}
break;
case LGLSXP: case INTSXP: {
int *subi = INTEGER(PROTECT(allocVector(INTSXP, maxgrpn)));
int *xi = INTEGER(x);
for (int i=0; i<ngrp; i++) {
const int thisgrpsize = grpsize[i];
int nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
else subi[j-nacount] = xi[k];
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
{
int *thread_subi = malloc(maxgrpn * sizeof(int));
if (!thread_subi) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
#pragma omp for
for (int i=0; i<ngrp; i++) {
const int thisgrpsize = grpsize[i];
int nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
else thread_subi[j-nacount] = xi[k];
}
if (nacount && !narm) {
ansd[i] = NA_REAL;
} else {
const int validsize = thisgrpsize-nacount;
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
ansd[i] = validsize > 100 ? ifloyd_rivest(thread_subi, validsize) : iquickselect(thread_subi, validsize);
}
}
ansd[i] = (nacount && !narm) ? NA_REAL : iquickselect(subi, thisgrpsize-nacount);
free(thread_subi);
}}
break;
default:
error(_("Type '%s' is not supported by GForce %s. Either add the prefix %s or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x)), "median (gmedian)", "stats::median(.)");
}
if (!isInt64) copyMostAttrib(x, ans);
// else the integer64 class needs to be dropped since double is always returned by gmedian
UNPROTECT(2); // ans, subd|subi
UNPROTECT(1); // ans
return ans;
}

Expand Down
72 changes: 72 additions & 0 deletions src/quickselect.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,75 @@ double i64quickselect(int64_t *x, int n)
int64_t a, b;
BODY(i64swap);
}

// Floyd-Rivest selection algorithm
// https://en.wikipedia.org/wiki/Floyd%E2%80%93Rivest_algorithm

#undef FLOYD_RIVEST_BODY
#define FLOYD_RIVEST_BODY(SWAP, TYPE) \
if (n == 0) return NA_REAL; \
unsigned long med = n / 2 - (n % 2 == 0); \
unsigned long l = 0, ir = n - 1; \
while (ir > l) { \
if (ir - l > 600) { \
unsigned long nn = ir - l + 1; \
unsigned long i = med - l + 1; \
double z = log((double)nn); \
double s = 0.5 * exp(2.0*z/3.0); \
double sd = 0.5 * sqrt(z*s*(nn-s)/nn); \
if (i < nn/2) sd = -sd; \
unsigned long newL = (unsigned long)(MAX((long)l, (long)(med - i*s/nn + sd))); \
unsigned long newR = (unsigned long)(MIN((long)ir, (long)(med + (nn-i)*s/nn + sd))); \
l = newL; \
ir = newR; \
} \
TYPE pivot = x[med]; \
unsigned long i = l, j = ir; \
SWAP(x + l, x + med); \
if (x[ir] > pivot) { \
SWAP(x + ir, x + l); \
pivot = x[l]; \
} \
while (i < j) { \
SWAP(x + i, x + j); \
i++; j--; \
while (x[i] < pivot) i++; \
while (x[j] > pivot) j--; \
} \
if (x[l] == pivot) { \
SWAP(x + l, x + j); \
} else { \
j++; \
SWAP(x + j, x + ir); \
} \
if (j <= med) l = j + 1; \
if (med <= j) ir = j - 1; \
} \
a = x[med]; \
if (n % 2 == 1) { \
return (double)a; \
} else { \
b = x[med + 1]; \
for (unsigned long i = med + 2; i < n; i++) { \
if (x[i] < b) b = x[i]; \
} \
return ((double)a + (double)b) / 2.0; \
}

double dfloyd_rivest(double *x, int n)
{
double a, b;
FLOYD_RIVEST_BODY(dswap, double);
}

double ifloyd_rivest(int *x, int n)
{
int a, b;
FLOYD_RIVEST_BODY(iswap, int);
}

double i64floyd_rivest(int64_t *x, int n)
{
int64_t a, b;
FLOYD_RIVEST_BODY(i64swap, int64_t);
}
Loading