diff --git a/configure b/configure index 44d3fae95b6..dffd3ca968b 100755 --- a/configure +++ b/configure @@ -817,6 +817,8 @@ enable_faultinjector enable_debug_extensions enable_pxf enable_gpfdist +LIBSSH2_LIBS +LIBSSH2_CFLAGS enable_strong_random enable_rpath default_port @@ -976,6 +978,8 @@ XML2_CFLAGS XML2_LIBS LZ4_CFLAGS LZ4_LIBS +LIBSSH2_LIBS +LIBSSH2_CFLAGS LDFLAGS_EX LDFLAGS_SL PERL @@ -1724,6 +1728,10 @@ Some influential environment variables: XML2_LIBS linker flags for XML2, overriding pkg-config LZ4_CFLAGS C compiler flags for LZ4, overriding pkg-config LZ4_LIBS linker flags for LZ4, overriding pkg-config + LIBSSH2_CFLAGS + C compiler flags for LIBSSH2, overriding pkg-config + LIBSSH2_LIBS + linker flags for LIBSSH2, overriding pkg-config LDFLAGS_EX extra linker flags for linking executables only LDFLAGS_SL extra linker flags for linking shared libraries only PERL Perl program @@ -15218,6 +15226,88 @@ fi LIBS="$_LIBS" fi +# +# LIBSSH2 +# +# +if test "$enable_gpfdist" = yes; then + # Check libssh2 >= 1.0.0 + +pkg_failed=no +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for libssh2 >= 1.0.0" >&5 +$as_echo_n "checking for libssh2 >= 1.0.0... " >&6; } + +if test -n "$LIBSSH2_CFLAGS"; then + pkg_cv_LIBSSH2_CFLAGS="$LIBSSH2_CFLAGS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libssh2 >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libssh2 >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBSSH2_CFLAGS=`$PKG_CONFIG --cflags "libssh2 >= 1.1.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi +if test -n "$LIBSSH2_LIBS"; then + pkg_cv_LIBSSH2_LIBS="$LIBSSH2_LIBS" + elif test -n "$PKG_CONFIG"; then + if test -n "$PKG_CONFIG" && \ + { { $as_echo "$as_me:${as_lineno-$LINENO}: \$PKG_CONFIG --exists --print-errors \"libssh2 >= 1.0.0\""; } >&5 + ($PKG_CONFIG --exists --print-errors "libssh2 >= 1.0.0") 2>&5 + ac_status=$? + $as_echo "$as_me:${as_lineno-$LINENO}: \$? = $ac_status" >&5 + test $ac_status = 0; }; then + pkg_cv_LIBSSH2_LIBS=`$PKG_CONFIG --libs "libssh2 >= 1.0.0" 2>/dev/null` + test "x$?" != "x0" && pkg_failed=yes +else + pkg_failed=yes +fi + else + pkg_failed=untried +fi + + + +if test $pkg_failed = yes; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + +if $PKG_CONFIG --atleast-pkgconfig-version 0.20; then + _pkg_short_errors_supported=yes +else + _pkg_short_errors_supported=no +fi + if test $_pkg_short_errors_supported = yes; then + LIBSSH2_PKG_ERRORS=`$PKG_CONFIG --short-errors --print-errors --cflags --libs "libssh2 >= 1.0.0" 2>&1` + else + LIBSSH2_PKG_ERRORS=`$PKG_CONFIG --print-errors --cflags --libs "libssh2 >= 1.0.0" 2>&1` + fi + # Put the nasty error message in config.log where it belongs + echo "$LIBSSH2_PKG_ERRORS" >&5 + + as_fn_error $? "libssh2 >= 1.0.0 is required for gpfdist support" "$LINENO" 5 + +elif test $pkg_failed = untried; then + { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5 +$as_echo "no" >&6; } + as_fn_error $? "libssh2 >= 1.0.0 is required for gpfdist support" "$LINENO" 5 + +else + LIBSSH2_CFLAGS=$pkg_cv_LIBSSH2_CFLAGS + LIBSSH2_LIBS=$pkg_cv_LIBSSH2_LIBS + { $as_echo "$as_me:${as_lineno-$LINENO}: result: yes" >&5 +$as_echo "yes" >&6; } + +fi +$as_echo "#define LIBSSH2 1" >>confdefs.h +fi + # # SSL Library # diff --git a/configure.ac b/configure.ac index 0d0529fc35f..6ea9e57fd6c 100644 --- a/configure.ac +++ b/configure.ac @@ -225,6 +225,15 @@ PGAC_ARG_BOOL(enable, gpfdist, yes, [do not use gpfdist]) AC_SUBST(enable_gpfdist) +if test "$enable_gpfdist" = yes; then + # Check libssh2 >= 1.0.0 + PKG_CHECK_MODULES([LIBSSH2], [libssh2 >= 1.0.0], + [AC_DEFINE([LIBSSH2], [1], [Define if libssh2 is available])], + [AC_MSG_WARN([libssh2 >= 1.0.0 not found, gpfdist will build without libssh2 support])] +) + AC_MSG_RESULT([checking whether to build with gpfdist support ... yes]) +fi + # # pxf # @@ -1659,6 +1668,10 @@ if test "$enable_gpfdist" = yes ; then EVENT_LIBS="$EVENT_LIBS" AC_SUBST(EVENT_LIBS) + AC_SEARCH_LIBS(libssh2_init, [libssh2], [have_libssh2=yes; LIBSSH2_LIBS=" -lssh2"], [AC_MSG_ERROR([libssh2 is required for gpfdist])]) + AC_SUBST(LIBSSH2_LIBS) + AC_SUBST(have_libssh2) + AC_SEARCH_LIBS(yaml_parser_initialize, [yaml], [have_yaml=yes; YAML_LIBS=" -lyaml"], [AC_MSG_WARN([libyaml is not found. disabling transformations for gpfdist.])]) AC_SUBST(YAML_LIBS) AC_SUBST(have_yaml) @@ -1957,6 +1970,7 @@ if test "$enable_gpfdist" = yes; then AC_CHECK_HEADERS(yaml.h, [], [AC_MSG_WARN([header file is not found. disabling transformations for gpfdist.])]) AC_CHECK_HEADERS(event.h, [], [AC_MSG_ERROR([header file is required for gpfdist])]) + AC_CHECK_HEADERS(libssh2, [], [AC_MSG_ERROR([header file is required for gpfdist])]) ac_save_CPPFLAGS=$CPPFLAGS CPPFLAGS="$apr_includes $CPPFLAGS" diff --git a/src/backend/Makefile b/src/backend/Makefile index 44dbe7f0e15..a10fc61a79a 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -67,6 +67,7 @@ LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS)) # Cloudberry uses threads in the backend LIBS := $(LIBS) -lpthread +LIBS := $(LIBS) -lssh2 ifeq ($(with_systemd),yes) LIBS += -lsystemd diff --git a/src/backend/utils/misc/fstream/Makefile b/src/backend/utils/misc/fstream/Makefile index c8cea254e8c..835f31c3f0f 100644 --- a/src/backend/utils/misc/fstream/Makefile +++ b/src/backend/utils/misc/fstream/Makefile @@ -9,7 +9,9 @@ subdir = src/backend/utils/misc/fstream top_builddir = ../../../../.. include $(top_builddir)/src/Makefile.global -override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) +LDFLAGS += -lssh2 + +override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) $(CPPFLAGS) OBJS = fstream.o gfile.o diff --git a/src/backend/utils/misc/fstream/fstream.c b/src/backend/utils/misc/fstream/fstream.c index 56a730b98ad..cbb47dedef1 100644 --- a/src/backend/utils/misc/fstream/fstream.c +++ b/src/backend/utils/misc/fstream/fstream.c @@ -30,6 +30,8 @@ #include #endif +#include + char* format_error(char* c1, char* c2); @@ -147,6 +149,36 @@ static void glob_and_copyfree(glob_and_copy_t *pglob) } } +static void glob_and_copyfree_sftp(glob_and_copy_t *pglob) +{ + if (pglob->gl_pathc) + { + int i; + + for (i = 0; i < pglob->gl_pathc; i++) + { + gfile_free(pglob->gl_pathv[i]); + gfile_free(pglob->gl_username[i]); + gfile_free(pglob->gl_passwd[i]); + gfile_free(pglob->gl_hostaddr[i]); + gfile_free(pglob->gl_port[i]); + } + + gfile_free(pglob->gl_pathv); + gfile_free(pglob->gl_username); + gfile_free(pglob->gl_passwd); + gfile_free(pglob->gl_hostaddr); + gfile_free(pglob->gl_port); + + pglob->gl_pathc = 0; + pglob->gl_pathv = 0; + pglob->gl_username = 0; + pglob->gl_passwd = 0; + pglob->gl_hostaddr = 0; + pglob->gl_port = 0; + } +} + const char* fstream_get_error(fstream_t*fs) { @@ -315,7 +347,12 @@ int fstream_close_with_error(fstream_t* fs, char* error) if(fs->buffer) gfile_free(fs->buffer); - glob_and_copyfree(&fs->glob); + if (fs->fd.is_sftp) + { + glob_and_copyfree_sftp(&fs->glob); + } + else + glob_and_copyfree(&fs->glob); gfile_close(&fs->fd); #ifdef GPFXDIST /* @@ -453,6 +490,310 @@ static int glob_path(fstream_t *fs, const char *path) return 0; } +int get_sftp_counts(const char *sftp_request) +{ + const char *start; + const char *end; + int count1 = 0; + int count2 = 0; + + start = (char *)sftp_request; + end = (char *)sftp_request; + + while (*start || *end) + { + if (*start == '<') + count1++; + start++; + if (*end == '>') + count2++; + end++; + } + + if (count1 != count2) + return -1; + + return count1; +} + +int ParseFilePathUri(char *uri_str, sftp_info_t *info) +{ + + int protocol_len = 0; + int len = 0; + char *start, *end; + + if (strncmp(uri_str, "sftp://", 7) == 0) + { + protocol_len = 7; + } + else + { + return -2; + } + + start = (char *)uri_str + protocol_len; + end = strchr(start, ':'); + if (end == NULL) + { + + return -2; + } + else + { + len = end - start; + + char *username = NULL; + username = (char *)gfile_malloc(len + 1); + if (username == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(username, start, len); + username[len] = '\0'; + strcpy(info->username, username); + gfile_free(username); + } + start = end + 1; + end = strchr(start, '@'); + + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *passwd = NULL; + passwd = (char *)gfile_malloc(len + 1); + if (passwd == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(passwd, start, len); + passwd[len] = '\0'; + strcpy(info->password, passwd); + gfile_free(passwd); + } + + start = end + 1; + if (strncmp(start, "[", 1) == 0) + { + end = strchr(start, ']'); + if (end == NULL) + return -2; + + len = end - start; + char *hostaddr6 = NULL; + hostaddr6 = (char *)gfile_malloc(len); + if (hostaddr6 == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(hostaddr6, start + 1, len - 1); + hostaddr6[len - 1] = '\0'; + strcpy(info->hostaddr, hostaddr6); + gfile_free(hostaddr6); + + end++; + } + + else + { + end = strchr(start, ':'); + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *hostaddr4 = NULL; + hostaddr4 = (char *)gfile_malloc(len + 1); + if (hostaddr4 == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(hostaddr4, start, len); + hostaddr4[len] = '\0'; + strcpy(info->hostaddr, hostaddr4); + gfile_free(hostaddr4); + } + } + + start = end + 1; + end = strchr(start, '/'); + if (end == NULL) + { + return -2; + } + else + { + len = end - start; + + char *port = NULL; + port = (char *)gfile_malloc(len + 1); + if (port == NULL) + { + gfile_printf_then_putc_newline("out of memory"); + return -1; + } + strncpy(port, start, len); + port[len] = '\0'; + strcpy(info->port, port); + gfile_free(port); + } + + start = end; + strcpy(info->fpath, start); + + return 0; +} + +static int order_by_hostaddr(const void *e1, const void *e2) +{ + struct in_addr ip1; + struct in_addr ip2; + int result = 0; + + inet_pton(AF_INET, ((struct sftp_info_t *)e1)->hostaddr, &ip1); + inet_pton(AF_INET, ((struct sftp_info_t *)e2)->hostaddr, &ip2); + + result = memcmp(&ip1, &ip2, sizeof(struct in_addr)); + + if (result > 0) + return 1; + else if (result < 0) + return -1; + else + return 0; +} + +static int fetch_sftp_paths(fstream_t *fs, const char *path) +{ + int counts = get_sftp_counts(path); + if(counts == -1) + { + gfile_printf_then_putc_newline("sftp path is Non-conforming, please check "); + return 1; + } + + if(counts == 0) + { + gfile_printf_then_putc_newline("sftp path is empty"); + return 1; + } + char **res = (char **)malloc(sizeof(char *) * counts); + + char **unames = (char **)gfile_malloc(sizeof(char *) * counts); + char **passwds = (char **)gfile_malloc(sizeof(char *) * counts); + char **hostaddrs = (char **)gfile_malloc(sizeof(char *) * counts); + char **ports = (char **)gfile_malloc(sizeof(char *) * counts); + char **fpaths = (char **)gfile_malloc(sizeof(char *) * counts); + + if (!unames || !passwds || !hostaddrs || !ports || !fpaths) + { + gfile_printf_then_putc_newline("out of memory"); + return 1; + } + + sftp_info_t sftp_info_lists[64]; + + fs->glob.gl_pathc = 0; + fs->glob.gl_username = unames; + fs->glob.gl_passwd = passwds; + fs->glob.gl_hostaddr = hostaddrs; + fs->glob.gl_port = ports; + fs->glob.gl_pathv = fpaths; + + { + const char *start; + const char *end; + start = (char *)path; + end = (char *)path; + + for (int i = 0; i < counts; i++) + { + while (*start != '<') + start++; + while (*end != '>') + end++; + int len = end - start - 1; + char *req = (char *)gfile_malloc(len + 1); + if (!req) + { + gfile_printf_then_putc_newline("out of memory"); + return 1; + } + strncpy(req, start + 1, len); + req[len] = '\0'; + res[i] = strdup(req); + gfile_free(req); + start++; + end++; + } + } + + for (int i = 0; i < counts; i++) + { + int parse_result = ParseFilePathUri(res[i], &sftp_info_lists[i]); + if(parse_result == 0) + continue; + else + { + for (int j = 0; j < counts; j++) + { + if(res[i]) + free(res[i]); + } + free(res); + } + } + + qsort(sftp_info_lists, counts, sizeof(sftp_info_t), order_by_hostaddr); + + for (int j = 0; j < counts; j++) + { + char *tmp_uname = sftp_info_lists[j].username; + char *tmp_passwd = sftp_info_lists[j].password; + char *tmp_hostaddr = sftp_info_lists[j].hostaddr; + char *tmp_port = sftp_info_lists[j].port; + char *tmp_fpath = sftp_info_lists[j].fpath; + + *unames = (char *)gfile_malloc(strlen(tmp_uname) + 1); + *passwds = (char *)gfile_malloc(strlen(tmp_passwd) + 1); + *hostaddrs = (char *)gfile_malloc(strlen(tmp_hostaddr) + 1); + *ports = (char *)gfile_malloc(strlen(tmp_port) + 1); + *fpaths = (char *)gfile_malloc(strlen(tmp_fpath) + 1); + + if (!(*unames) || !(*passwds) || !(*hostaddrs) || !(*ports) || !(*fpaths)) + { + gfile_printf_then_putc_newline("out of memory!!!"); + return 1; + } + + strcpy(*unames++, tmp_uname); + strcpy(*passwds++, tmp_passwd); + strcpy(*hostaddrs++, tmp_hostaddr); + strcpy(*ports++, tmp_port); + strcpy(*fpaths++, tmp_fpath); + + fs->glob.gl_pathc++; + } + + for (int i = 0; i < counts; i++) + { + free(res[i]); + } + free(res); + return 0; +} #ifdef GPFXDIST /* @@ -501,6 +842,7 @@ fstream_open(const char *path, const struct fstream_options *options, { int i; fstream_t* fs; + bool_t is_sftp = FALSE; *response_code = 500; *response_string = "Internal Server Error"; @@ -515,27 +857,44 @@ fstream_open(const char *path, const struct fstream_options *options, fs->options = *options; fs->buffer = gfile_malloc(options->bufsize); + if (strncmp(path, "/glob)) + else { - if (expand_directories(fs)) + if (glob_path(fs, path)) { fstream_close(fs); return 0; } + + /* + * If the list of files in our filestrem includes a directory name, expand + * the directory and add all the files inside of it. + */ + if (fpath_all_directories(&fs->glob)) + { + if (expand_directories(fs)) + { + fstream_close(fs); + return 0; + } + } } /* @@ -543,11 +902,14 @@ fstream_open(const char *path, const struct fstream_options *options, */ if (fs->glob.gl_pathc == 0) { - gfile_printf_then_putc_newline("fstream bad path: %s", path); - fstream_close(fs); - *response_code = 404; - *response_string = "No matching file(s) found"; - return 0; + if(!is_sftp) + { + gfile_printf_then_putc_newline("fstream bad path: %s", path); + fstream_close(fs); + *response_code = 404; + *response_string = "No matching file(s) found"; + return 0; + } } if (fs->glob.gl_pathc != 1 && options->forwrite) @@ -650,13 +1012,28 @@ fstream_open(const char *path, const struct fstream_options *options, gfile_close(&fs->fd); - if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), - response_code, response_string, transform)) + if (is_sftp) { - gfile_printf_then_putc_newline("fstream unable to open file %s", - fs->glob.gl_pathv[i]); - fstream_close(fs); - return 0; +#ifdef LIBSSH2 + if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[i], fs->glob.gl_username[i], fs->glob.gl_passwd[i], + fs->glob.gl_hostaddr[i], fs->glob.gl_port[i], gfile_open_flags(options->forwrite, options->usesync), response_code, response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", fs->glob.gl_pathv[i]); + fstream_close(fs); + return 0; + } +#endif + } + else + { + if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync), + response_code, response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[i]); + fstream_close(fs); + return 0; + } } fs->compressed_size += gfile_get_compressed_size(&fs->fd); @@ -707,14 +1084,29 @@ static int nextFile(fstream_t*fs) if (fs->fidx < fs->glob.gl_pathc) { fs->skip_header_line = fs->options.header; - - if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, - &response_code, &response_string, transform)) + if (fs->fd.is_sftp) { - gfile_printf_then_putc_newline("fstream unable to open file %s", - fs->glob.gl_pathv[fs->fidx]); - fs->ferror = "unable to open file"; - return 1; +#ifdef LIBSSH2 + if (gfile_open_sftp(&fs->fd, fs->glob.gl_pathv[fs->fidx], fs->glob.gl_username[fs->fidx], fs->glob.gl_passwd[fs->fidx], + fs->glob.gl_hostaddr[fs->fidx], fs->glob.gl_port[fs->fidx], GFILE_OPEN_FOR_READ, &response_code, &response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[fs->fidx]); + fs->ferror = "unable to open file"; + return 1; + } +#endif + } + else + { + if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ, + &response_code, &response_string, transform)) + { + gfile_printf_then_putc_newline("fstream unable to open file %s", + fs->glob.gl_pathv[fs->fidx]); + fs->ferror = "unable to open file"; + return 1; + } } } diff --git a/src/backend/utils/misc/fstream/gfile.c b/src/backend/utils/misc/fstream/gfile.c index 070ca1c649c..f347105da76 100644 --- a/src/backend/utils/misc/fstream/gfile.c +++ b/src/backend/utils/misc/fstream/gfile.c @@ -40,6 +40,14 @@ #include /* for flock */ #include +#ifdef LIBSSH2 +#include +#include +#endif + +#include +#include + #ifdef WIN32 #include #define snprintf _snprintf @@ -146,6 +154,60 @@ writewinpipe(gfile_t* fd, void* ptr, size_t size) return i; } +#ifdef LIBSSH2 +static ssize_t +sftp_read(gfile_t *fd, void *ptr, size_t size) +{ + ssize_t i = 0; + do + i = libssh2_sftp_read(fd->sftp_handle, ptr, size); + while (i < 0 && errno == EINTR); + + if (i < 0) + gfile_printf_then_putc_newline("i is %ld", i); + + if (i > 0) + fd->compressed_position += i; + return i; +} + +static ssize_t +sftp_write(gfile_t *fd, void *ptr, size_t size) +{ + ssize_t i = 0; + do + i = libssh2_sftp_write(fd->sftp_handle, ptr, size); + while (i < 0 && errno == EINTR); + + if (i > 0) + fd->compressed_position += i; + return i; +} + +static int +sftp_close(gfile_t *fd) +{ + if (fd->sftp_handle) + libssh2_sftp_close(fd->sftp_handle); + if (fd->sftp_session) + libssh2_sftp_shutdown(fd->sftp_session); + if (fd->session) + { + libssh2_session_disconnect(fd->session, "Normal Shutdown"); + libssh2_session_free(fd->session); + } + +#ifdef WIN32 + closesocket(fd->sock); +#else + close(fd->sock); +#endif + fd->sock = -1; + libssh2_exit(); + return 0; +} +#endif + #ifdef HAVE_LIBBZ2 static void * bz_alloc(void *a, int b, int c) @@ -192,8 +254,17 @@ bz_file_read(gfile_t *fd, void *ptr, size_t len) while (z->in_size < sizeof z->in) { - s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - - z->in_size); + if (fd->is_sftp) + { +#ifdef LIBSSH2 + gfile_printf_then_putc_newline("sftp_read : Read bz files from an SFTP server"); + s = sftp_read(fd, z->in + z->in_size, sizeof z->in - z->in_size); +#endif + } + + else + s = read_and_retry(fd, z->in + z->in_size, sizeof z->in + - z->in_size); if (s == 0) break; if (s < 0) @@ -306,7 +377,15 @@ gz_file_read(gfile_t* fd, void* ptr, size_t len) */ while (z->in_size < sizeof z->in) { - s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - z->in_size); + if (fd->is_sftp) + { +#ifdef LIBSSH2 + gfile_printf_then_putc_newline("sftp_read : Read gz files from an SFTP server"); + s = sftp_read(fd, z->in + z->in_size, sizeof z->in - z->in_size); +#endif + } + else + s = read_and_retry(fd, z->in + z->in_size, sizeof z->in - z->in_size); if (s == 0) { @@ -386,7 +465,19 @@ gz_file_write_one_chunk(gfile_t *fd, int do_flush) } have = COMPRESSION_BUFFER_SIZE - z->s.avail_out; - if ( write_and_retry(fd, z->out, have) != have ) + if (fd->is_sftp) + { +#ifdef LIBSSH2 + if (sftp_write(fd, z->out, have) != have) + { + gfile_printf_then_putc_newline("failed to sftp write, the stream ends"); + (void)deflateEnd(&(z->s)); + ret = -1; + break; + } +#endif + } + else if ( write_and_retry(fd, z->out, have) != have ) { /* * presently gfile_close calls gz_file_close only for the on_write case so we don't need @@ -1334,23 +1425,33 @@ gfile_close(gfile_t*fd) { fd->close(fd); } - - if (fd->is_win_pipe) + if (fd->is_sftp) { - fd->close(fd); +#ifdef LIBSSH2 + sftp_close(fd); +#endif } + else { - if(fd->held_pipe_lock) + if (fd->is_win_pipe) + { + fd->close(fd); + } + else { + if(fd->held_pipe_lock) + { #ifndef WIN32 - flock (fd->fd.filefd, LOCK_UN); + flock (fd->fd.filefd, LOCK_UN); #endif + } + ret = close_filefd(fd->fd.filefd); + if (ret == -1) + ret = 1; } - ret = close_filefd(fd->fd.filefd); - if (ret == -1) - ret = 1; } + } fd->read = 0; fd->close = 0; @@ -1358,6 +1459,102 @@ gfile_close(gfile_t*fd) return ret; } +#ifdef LIBSSH2 +int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port, int flags, int *response_code, const char **response_string, struct gpfxdist_t *transform) +{ + const char *s = strrchr(fpath, '.'); + + //struct stat sta; + LIBSSH2_SFTP_ATTRIBUTES sta; + memset(&sta, 0, sizeof(sta)); + memset(fd, 0, sizeof *fd); + fd->is_sftp = TRUE; + + if (flags != GFILE_OPEN_FOR_READ) + { + fd->is_write = TRUE; + } + + int ans = sftp_open(fd, fpath, sftp_uname, sftp_passwd, sftp_hostaddr, sftp_port); + if (ans == 0) + { + gfile_printf_then_putc_newline("looks like a ftp handle"); + gfile_printf_then_putc_newline("path is %s", fpath); + } + else + { + gfile_printf_then_putc_newline("failed open a ftp handle, please check sftp-information: ip, port, passwd and filename"); + if (ans == -2) + { + sftp_free(fd); + } + return 1; + } + + if (flags == GFILE_OPEN_FOR_READ) + { + if (0 != libssh2_sftp_stat(fd->sftp_session, fpath, &sta)) + { + gfile_printf_then_putc_newline("libssh2 libssh2_sftp_stat failed"); + return 1; + } + } + + fd->compressed_size = sta.filesize; + + fd->read = sftp_read; + fd->write = sftp_write; + fd->close = sftp_close; + + /* + * delegate remaining setup work to an appropriate open routine + * or return an error if we can't handle the type + */ + if (s && strcasecmp(s, ".gz") == 0) + { +#ifndef HAVE_LIBZ + gfile_printf_then_putc_newline(".gz not supported"); +#else + /* + * flag used by function gfile close + */ + fd->compression = GZ_COMPRESSION; + + if (flags != GFILE_OPEN_FOR_READ) + { + fd->is_write = TRUE; + } + + return gz_file_open(fd); +#endif + } + else if (s && strcasecmp(s, ".bz2") == 0) + { +#ifndef HAVE_LIBBZ2 + gfile_printf_then_putc_newline(".bz2 not supported"); +#else + fd->compression = BZ_COMPRESSION; + if (flags != GFILE_OPEN_FOR_READ) + gfile_printf_then_putc_newline(".bz2 not yet supported for writable tables"); + + return bz_file_open(fd); +#endif + } + else if (s && strcasecmp(s, ".z") == 0) + gfile_printf_then_putc_newline("gfile compression .z file is not supported"); + else if (s && strcasecmp(s, ".zip") == 0) + gfile_printf_then_putc_newline("gfile compression zip is not supported"); + else + return 0; + + *response_code = 415; + *response_string = "Unsupported File Type"; + + return 1; +} +#endif + ssize_t gfile_read(gfile_t *fd, void *ptr, size_t len) { @@ -1407,3 +1604,149 @@ off_t gfile_get_compressed_position(gfile_t *fd) { return fd->compressed_position; } + +#ifdef LIBSSH2 +int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port) +{ + int rc; + int auth_pw = 0; + char *userauthlist; + uint16_t port; + unsigned long hostaddr; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; + int is_ipv6 = 0; + + if (strchr(sftp_hostaddr, ':')) + { + is_ipv6 = 1; + } + if (!is_ipv6) + { + hostaddr = inet_addr(sftp_hostaddr); + } + + else + { + bzero(&sin6, sizeof(sin6)); + inet_pton(AF_INET6, sftp_hostaddr, &sin6.sin6_addr); + } + + sscanf(sftp_port, "%hu", &port); + + rc = libssh2_init(0); + + if (rc != 0) + { + gfile_printf_then_putc_newline("libssh2 initialization failed (%d)\n", rc); + return -1; + } + + fd->sock = -1; + + if (!is_ipv6) + { + fd->sock = socket(AF_INET, SOCK_STREAM, 0); + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = hostaddr; + + if (connect(fd->sock, (struct sockaddr *)(&sin), + sizeof(struct sockaddr_in)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } + } + + else + { + fd->sock = socket(AF_INET6, SOCK_STREAM, 0); + sin6.sin6_family = AF_INET6; + sin6.sin6_port = htons(port); + if (connect(fd->sock, (struct sockaddr *)(&sin6), + sizeof(struct sockaddr_in6)) != 0) + { + gfile_printf_then_putc_newline("failed to connect!\n"); + return -1; + } + } + + fd->session = libssh2_session_init(); + if (!fd->session) + return -1; + + /* Since we have set non-blocking, tell libssh2 we are blocking */ + libssh2_session_set_blocking(fd->session, 1); + + /* ... start it up. This will trade welcome banners, exchange keys, + * and setup crypto, compression, and MAC layers + */ + rc = libssh2_session_handshake(fd->session, fd->sock); + if (rc) + { + gfile_printf_then_putc_newline("Failure establishing SSH session: %d\n", rc); + return -1; + } + + /* check what authentication methods are available */ + userauthlist = libssh2_userauth_list(fd->session, sftp_uname, strlen(sftp_uname)); + if (strstr(userauthlist, "password") != NULL) + { + auth_pw |= 1; + } + + if (auth_pw & 1) + { + if (libssh2_userauth_password(fd->session, sftp_uname, sftp_passwd)) + { + gfile_printf_then_putc_newline("Authentication by password failed.\n"); + return -2; + } + } + + gfile_printf_then_putc_newline("libssh2_sftp_init()!\n"); + fd->sftp_session = libssh2_sftp_init(fd->session); + + if (!(fd->sftp_session)) + { + gfile_printf_then_putc_newline("Unable to init SFTP session\n"); + return -2; + } + + if (fd->is_write) + { + fd->sftp_handle = libssh2_sftp_open(fd->sftp_session, fpath, + LIBSSH2_FXF_WRITE | LIBSSH2_FXF_CREAT | LIBSSH2_FXF_TRUNC, + LIBSSH2_SFTP_S_IRUSR | LIBSSH2_SFTP_S_IWUSR | + LIBSSH2_SFTP_S_IRGRP | LIBSSH2_SFTP_S_IROTH); + } + else + { + fd->sftp_handle = libssh2_sftp_open(fd->sftp_session, fpath, LIBSSH2_FXF_READ, 0); + } + + if (!(fd->sftp_handle)) + { + gfile_printf_then_putc_newline("Unable to open file with SFTP: %ld\n", + libssh2_sftp_last_error(fd->sftp_session)); + return -2; + } + + return 0; +} +void sftp_free(gfile_t *fd) +{ + libssh2_session_disconnect(fd->session, "Normal Shutdown"); + libssh2_session_free(fd->session); + +#ifdef WIN32 + closesocket(fd->sock); +#else + close(fd->sock); +#endif + fd->sock = -1; + libssh2_exit(); +} +#endif \ No newline at end of file diff --git a/src/bin/gpfdist/Makefile b/src/bin/gpfdist/Makefile index 3739e3835c5..cb84109b309 100644 --- a/src/bin/gpfdist/Makefile +++ b/src/bin/gpfdist/Makefile @@ -25,7 +25,7 @@ ifeq ($(PORTNAME),win32) OBJS += $(top_builddir)/src/port/glob.o endif -LDLIBS += $(LIBS) $(GPFDIST_LIBS) $(apr_link_ld_libs) +LDLIBS += $(LIBS) $(GPFDIST_LIBS) $(apr_link_ld_libs) -lssh2 all: gpfdist$(X) diff --git a/src/bin/gpfdist/gpfdist.c b/src/bin/gpfdist/gpfdist.c index ad7fb868313..826ec2569b6 100644 --- a/src/bin/gpfdist/gpfdist.c +++ b/src/bin/gpfdist/gpfdist.c @@ -71,9 +71,15 @@ #include #endif +#include +#include +#include + #define DEFAULT_COMPRESS_LEVEL 3 #define MAX_FRAME_SIZE 65536 +#define MAX_GPFDIST_LOGSIZE (512 * 1024 * 1024) // 512MB + /* A data block */ typedef struct blockhdr_t blockhdr_t; struct blockhdr_t @@ -361,6 +367,7 @@ struct request_t static int ggetpid(); static void log_gpfdist_status(); static void log_request_header(const request_t *r); +static void log_aging_gpfdist(); static void gprint(const request_t *r, const char* fmt, ...) pg_attribute_printf(2, 3); @@ -1274,6 +1281,10 @@ static void request_end(request_t* r, int error, const char* errmsg, int sendhea static int local_send(request_t *r, const char* buf, int buflen) { int n = gpfdist_send(r, buf, buflen); + + int is_sftp_type = 0; + if(strncmp(r->path, "/session; retblock->bot = retblock->top = 0; @@ -1663,20 +1674,36 @@ static void sessions_cleanup(void) static int session_attach(request_t* r) { char key[1024]; + char tmp_key[1024]; session_t* session = NULL; - + int is_sftp_mode = 0; /* * create the session key (tid:path) */ - if (sizeof(key) - 1 == apr_snprintf(key, sizeof(key), "%s:%s", - r->tid, r->path)) + if (strncmp(r->path, "/tid) + strlen(r->path) >= 1024) + { + gwarning(NULL, "sftp_path is too long"); + request_end(r, 1, 0, 0); + return -1; + } } + else + { + if (sizeof(tmp_key) - 1 == apr_snprintf(tmp_key, sizeof(tmp_key), "%s:%s", + r->tid, r->path)) + { + http_error(r, FDIST_BAD_REQUEST, "path too long"); + request_end(r, 1, 0, 0); + return -1; + } + } + apr_snprintf(key, sizeof(key), "%s:%s", r->tid, r->path); /* check if such session already exists in hashtable */ session = apr_hash_get(gcb.session.tab, key, APR_HASH_KEY_STRING); @@ -1770,7 +1797,8 @@ static int session_attach(request_t* r) fstream_options.transform->errfile = r->trans.errfile; fstream_options.transform->stderr_server = r->trans.stderr_server; } - gprintlnif(r, "r->path %s", r->path); + if (is_sftp_mode == 0) + gprintlnif(r, "r->path %s", r->path); #endif /* try opening the fstream */ @@ -2058,6 +2086,55 @@ static void log_request_header(const request_t *r) gprintln(r, "%s:%s", r->in.req->hname[i], r->in.req->hvalue[i]); } +static void log_aging_gpfdist() +{ + struct stat filestats; // Structure to hold file statistics + char newfilename[256]; // Buffer to store the new filename + + if (stat(opt.l, &filestats) == 0 && filestats.st_size >= MAX_GPFDIST_LOGSIZE) + { + if(strlen(opt.l) > 256) + { + fprintf(stderr, "log file name is too long. please change log name under log_aging!\n"); + exit(1); + } + + snprintf(newfilename, sizeof(newfilename), "%s.old", opt.l); + + if(stat(newfilename, &filestats)) + remove(newfilename); + + rename(opt.l, newfilename); + + /* Redirect stderr and stdout to the log file */ + if (opt.l) + { + FILE *f_stderr; + FILE *f_stdout; + + f_stderr = freopen(opt.l, "a", stderr); + if (f_stderr == NULL) + { + fprintf(stderr, "failed to redirect stderr to log: %s under log_aging.\n", strerror(errno)); + exit(1); + } +#ifndef WIN32 + setlinebuf(stderr); +#endif + + f_stdout = freopen(opt.l, "a", stdout); + if (f_stdout == NULL) + { + fprintf(stderr, "failed to redirect stdout to log: %s under log_aging.\n", strerror(errno)); + exit(1); + } +#ifndef WIN32 + setlinebuf(stdout); +#endif + } + } +} + /* * do_read_request * @@ -2070,6 +2147,7 @@ static void do_read_request(int fd, short event, void* arg) char* p = NULL; char* pp = NULL; char* path = NULL; + bool is_sftp_type = false; /* If we timeout, close the request. */ if (event & EV_TIMEOUT) @@ -2209,6 +2287,11 @@ static void do_read_request(int fd, short event, void* arg) /* we forced in a filename with the hidden -f option. use it */ r->path = opt.f; } + else if (!strncmp(path, "/path = apr_psprintf(r->pool, "%s", path); + } else { if(request_set_path(r, opt.d, p, pp, path) != 0) @@ -4677,6 +4760,8 @@ static void do_close(int fd, short event, void *arg) apr_pool_destroy(r->pool); fflush(stdout); + + log_aging_gpfdist(); } /* diff --git a/src/common/Makefile b/src/common/Makefile index 4549e6a24fb..469190571a8 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -42,6 +42,7 @@ override CPPFLAGS += -DVAL_LIBS="\"$(LIBS)\"" override CPPFLAGS := -DFRONTEND -I. -I$(top_srcdir)/src/common $(CPPFLAGS) LIBS += $(PTHREAD_LIBS) +LIBS += -lssh2 # If you add objects here, see also src/tools/msvc/Mkvcbuild.pm diff --git a/src/include/cdb/cdbsreh.h b/src/include/cdb/cdbsreh.h index fdb29716d3f..97ece750a1e 100644 --- a/src/include/cdb/cdbsreh.h +++ b/src/include/cdb/cdbsreh.h @@ -100,5 +100,6 @@ extern Datum gp_truncate_error_log(PG_FUNCTION_ARGS); extern Datum gp_read_persistent_error_log(PG_FUNCTION_ARGS); extern Datum gp_truncate_persistent_error_log(PG_FUNCTION_ARGS); +extern int get_sftpfile_numbers(const char *path); #endif /* CDBSREH_H */ diff --git a/src/include/fstream/fstream.h b/src/include/fstream/fstream.h index 2c848490b88..e490868e4bc 100644 --- a/src/include/fstream/fstream.h +++ b/src/include/fstream/fstream.h @@ -23,6 +23,12 @@ typedef struct { int gl_pathc; char** gl_pathv; + char** gl_username; + char** gl_passwd; + char** gl_keyfile1; + char** gl_keyfile2; + char** gl_hostaddr; + char** gl_port; } glob_and_copy_t; struct fstream_options{ @@ -85,4 +91,7 @@ int fstream_close_with_error(fstream_t* fs, char* msg); void fstream_close(fstream_t* fs); bool_t fstream_is_win_pipe(fstream_t *fs); +int ParseFilePathUri(char *uri_str, sftp_info_t *info); +int get_sftp_counts(const char *sftp_request); + #endif diff --git a/src/include/fstream/gfile.h b/src/include/fstream/gfile.h index 565a7d910c9..0bc01f644e9 100644 --- a/src/include/fstream/gfile.h +++ b/src/include/fstream/gfile.h @@ -12,6 +12,11 @@ #include #endif +#ifdef LIBSSH2 +#include +#include +#endif + #ifdef WIN32 #include #endif @@ -57,6 +62,7 @@ typedef struct gfile_t off_t compressed_size,compressed_position; bool_t is_win_pipe; bool_t held_pipe_lock; /* Whether held flock on pipe file, used to restrict only one reader of pipe */ + bool_t is_sftp; union { @@ -66,6 +72,13 @@ typedef struct gfile_t #endif } fd; +#ifdef LIBSSH2 + LIBSSH2_SESSION *session; + LIBSSH2_SFTP *sftp_session; + LIBSSH2_SFTP_HANDLE *sftp_handle; + int sock; +#endif + union { int txt; @@ -85,6 +98,19 @@ typedef struct gfile_t struct gpfxdist_t* transform; } gfile_t; +/* Struct of sftp info */ +typedef struct sftp_info_t sftp_info_t; +struct sftp_info_t +{ + char username[32]; + char password[64]; + char keyfile1[64]; + char keyfile2[64]; + char hostaddr[64]; + char port[16]; + char fpath[256]; +}; + /* * MPP-13817 (support opening files without O_SYNC) */ @@ -103,4 +129,11 @@ void gfile_printf_then_putc_newline(const char*format,...) pg_attribute_printf(1 void*gfile_malloc(size_t size); void gfile_free(void*a); +#ifdef LIBSSH2 +int gfile_open_sftp(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port, int flags, int *response_code, const char **response_string, struct gpfxdist_t *transform); +extern int sftp_open(gfile_t *fd, const char *fpath, const char *sftp_uname, const char *sftp_passwd, const char *sftp_hostaddr, + const char *sftp_port); +extern void sftp_free(gfile_t *fd); +#endif #endif