diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h index a3b185392c76c..6961f30017220 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -20,6 +20,7 @@ #include "libhdfspp/options.h" #include "libhdfspp/status.h" +#include "common/cancelable.h" #include #include @@ -54,9 +55,9 @@ class IoService { }; /** - * Applications opens an InputStream to read files in HDFS. + * Applications opens a FileHandle to read files in HDFS. **/ -class InputStream { +class FileHandle { public: /** * Read data from a specific position. The current implementation @@ -71,12 +72,13 @@ class InputStream { * The handler returns the datanode that serves the block and the number of * bytes has read. **/ - virtual void + virtual CancelHandle PositionRead(void *buf, size_t nbyte, uint64_t offset, - const std::set &excluded_datanodes, - const std::function &handler) = 0; - virtual ~InputStream(); + const std::function &handler) = 0; + + virtual size_t PositionRead(void *buf, size_t nbyte, off_t offset) = 0; + + virtual ~FileHandle(); }; /** @@ -93,6 +95,12 @@ class FileSystem { New(IoService *io_service, const Options &options, const std::string &server, const std::string &service, const std::function &handler); + + /* Synchronous call of New*/ + static FileSystem * + New(IoService *io_service, const Options &options, const std::string &server, + const std::string &service); + /** * Open a file on HDFS. The call issues an RPC to the NameNode to * gather the locations of all blocks in the file and to return a @@ -100,8 +108,11 @@ class FileSystem { **/ virtual void Open(const std::string &path, - const std::function &handler) = 0; - virtual ~FileSystem(); + const std::function &handler) = 0; + virtual Status Open(const std::string &path, FileHandle **handle); + + virtual ~FileSystem() {}; + }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt index 434dc4e8eff8d..c8515979d75ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt @@ -21,4 +21,5 @@ add_subdirectory(fs) add_subdirectory(reader) add_subdirectory(rpc) add_subdirectory(proto) +add_subdirectory(connection) add_subdirectory(bindings) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt index e170370c92040..664518ac684ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt @@ -16,5 +16,5 @@ # under the License. -add_library(bindings_c hdfs.cc hdfs_cpp.cc) +add_library(bindings_c hdfs.cc) add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 9b985a932ad02..cd3af77c2cd4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -16,8 +16,10 @@ * limitations under the License. */ -#include "hdfs_cpp.h" +#include "fs/filesystem.h" +#include +#include #include #include @@ -25,15 +27,15 @@ using namespace hdfs; /* Seperate the handles used by the C api from the C++ API*/ struct hdfs_internal { - hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {} - hdfs_internal(std::unique_ptr p) + hdfs_internal(FileSystem *p) : filesystem_(p) {} + hdfs_internal(std::unique_ptr p) : filesystem_(std::move(p)) {} virtual ~hdfs_internal(){}; - HadoopFileSystem *get_impl() { return filesystem_.get(); } - const HadoopFileSystem *get_impl() const { return filesystem_.get(); } + FileSystem *get_impl() { return filesystem_.get(); } + const FileSystem *get_impl() const { return filesystem_.get(); } private: - std::unique_ptr filesystem_; + std::unique_ptr filesystem_; }; struct hdfsFile_internal { @@ -65,17 +67,17 @@ static void ReportError(int errnum, std::string msg) { int hdfsFileIsOpenForRead(hdfsFile file) { /* files can only be open for reads at the moment, do a quick check */ if (file) { - return file->get_impl()->IsOpenForRead(); + return true; // Update implementation when we get file writing } return false; } hdfsFS hdfsConnect(const char *nn, tPort port) { - HadoopFileSystem *fs = new HadoopFileSystem(); - Status stat = fs->Connect(nn, port); - if (!stat.ok()) { + std::string port_as_string = std::to_string(port); + IoService * io_service = IoService::New(); + FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string); + if (!fs) { ReportError(ENODEV, "Unable to connect to NameNode."); - delete fs; return nullptr; } return new hdfs_internal(fs); @@ -102,7 +104,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, return nullptr; } FileHandle *f = nullptr; - Status stat = fs->get_impl()->OpenFileForRead(path, &f); + Status stat = fs->get_impl()->Open(path, &f); if (!stat.ok()) { return nullptr; } @@ -133,5 +135,5 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, return -1; } - return file->get_impl()->Pread(buffer, length, position); + return file->get_impl()->PositionRead(buffer, length, position); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc deleted file mode 100644 index 3f3fb8dc3d190..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs_cpp.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include "libhdfspp/hdfs.h" -#include "libhdfspp/status.h" -#include "fs/filesystem.h" -#include "common/hdfs_public_api.h" - -namespace hdfs { - -ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) { - auto stat = std::make_shared>(); - std::future future(stat->get_future()); - - /* wrap async call with promise/future to make it blocking */ - size_t read_count = 0; - auto callback = [stat, &read_count](const Status &s, const std::string &dn, - size_t bytes) { - (void)dn; - stat->set_value(s); - read_count = bytes; - }; - - input_stream_->PositionRead(buf, nbyte, offset, std::set(), - callback); - - /* wait for async to finish */ - auto s = future.get(); - - if (!s.ok()) { - return -1; - } - return (ssize_t)read_count; -} - -bool FileHandle::IsOpenForRead() { - /* for now just check if InputStream exists */ - if (!input_stream_) { - return false; - } - return true; -} - -HadoopFileSystem::~HadoopFileSystem() { - /** - * Note: IoService must be stopped before getting rid of worker threads. - * Once worker threads are joined and deleted the service can be deleted. - **/ - - file_system_.reset(nullptr); - service_->Stop(); - worker_threads_.clear(); - service_.reset(nullptr); -} - -Status HadoopFileSystem::Connect(const char *nn, tPort port, - unsigned int threads) { - /* IoService::New can return nullptr */ - if (!service_) { - return Status::Error("Null IoService"); - } - /* spawn background threads for asio delegation */ - for (unsigned int i = 0; i < threads; i++) { - AddWorkerThread(); - } - /* synchronized */ - FileSystem *fs = nullptr; - auto stat = std::make_shared>(); - std::future future = stat->get_future(); - - auto callback = [stat, &fs](const Status &s, FileSystem *f) { - fs = f; - stat->set_value(s); - }; - - /* dummy options object until this is hooked up to HDFS-9117 */ - Options options_object; - FileSystem::New(service_.get(), options_object, nn, std::to_string(port), - callback); - - /* block until promise is set */ - auto s = future.get(); - - /* check and see if it worked */ - if (!fs) { - service_->Stop(); - worker_threads_.clear(); - return s; - } - - file_system_ = std::unique_ptr(fs); - return s; -} - -int HadoopFileSystem::AddWorkerThread() { - auto service_task = [](IoService *service) { service->Run(); }; - worker_threads_.push_back( - WorkerPtr(new std::thread(service_task, service_.get()))); - return worker_threads_.size(); -} - -Status HadoopFileSystem::OpenFileForRead(const std::string &path, - FileHandle **handle) { - auto stat = std::make_shared>(); - std::future future = stat->get_future(); - - /* wrap async FileSystem::Open with promise to make it a blocking call */ - InputStream *input_stream = nullptr; - auto h = [stat, &input_stream](const Status &s, InputStream *is) { - stat->set_value(s); - input_stream = is; - }; - - file_system_->Open(path, h); - - /* block until promise is set */ - auto s = future.get(); - - if (!s.ok()) { - delete input_stream; - return s; - } - if (!input_stream) { - return s; - } - - *handle = new FileHandle(input_stream); - return s; -} -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h deleted file mode 100644 index 5822ff4c7b1f9..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H -#define LIBHDFSPP_BINDINGS_HDFSCPP_H - -#include -#include -#include - -#include "libhdfspp/hdfs.h" -#include - -namespace hdfs { - -/** - * Implement a very simple 'it just works' interface in C++ - * that provides posix-like file operations + extra stuff for hadoop. - * Then provide very thin C wrappers over each method. - */ - -class FileHandle { - public: - virtual ~FileHandle(){}; - ssize_t Pread(void *buf, size_t nbyte, off_t offset); - bool IsOpenForRead(); - - private: - /* handle should only be created by fs */ - friend class HadoopFileSystem; - FileHandle(InputStream *is) : input_stream_(is){}; - std::unique_ptr input_stream_; -}; - -class HadoopFileSystem { - public: - HadoopFileSystem() : service_(IoService::New()) {} - virtual ~HadoopFileSystem(); - - /* attempt to connect to namenode, return false on failure */ - Status Connect(const char *nn, tPort port, unsigned int threads = 1); - - /* how many worker threads are servicing asio requests */ - int WorkerThreadCount() { return worker_threads_.size(); } - - /* add a new thread to handle asio requests, return number of threads in pool - */ - int AddWorkerThread(); - - Status OpenFileForRead(const std::string &path, FileHandle **handle); - - private: - std::unique_ptr service_; - /* std::thread needs to join before deletion */ - struct WorkerDeleter { - void operator()(std::thread *t) { - t->join(); - delete t; - } - }; - typedef std::unique_ptr WorkerPtr; - std::vector worker_threads_; - std::unique_ptr file_system_; -}; -} - -#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h new file mode 100644 index 0000000000000..80006473a7851 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_COMMON_ASYNC_STREAM_H_ +#define LIB_COMMON_ASYNC_STREAM_H_ + +#include "common/cancelable.h" +#include + +namespace hdfs { + +typedef asio::mutable_buffers_1 MutableBuffers; +typedef asio::const_buffers_1 ConstBuffers; + +/* + * asio-compatible stream implementation. + * + * Lifecycle: should be managed using std::shared_ptr so the object can be + * handed from consumer to consumer + * Threading model: async_read_some and async_write_some are not thread-safe. + * Cancel() can be called from any context + */ +class AsyncStream : public Cancelable { +public: + virtual void async_read_some(const MutableBuffers &buf, + std::function handler) = 0; + + virtual void async_write_some(const ConstBuffers &buf, + std::function handler) = 0; +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancelable.h similarity index 51% rename from hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc rename to hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancelable.h index b47dcb1a8fad2..7cd1cfa626032 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancelable.h @@ -15,32 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#ifndef LIB_COMMON_CANCELABLE_H_ +#define LIB_COMMON_CANCELABLE_H_ -#include "filesystem.h" +#include namespace hdfs { -using ::hadoop::hdfs::LocatedBlocksProto; +class Cancelable { +public: + virtual void cancel() = 0; +}; -InputStream::~InputStream() {} +class CancelHandle : public Cancelable { +public: + CancelHandle(std::shared_ptr target) : target_(target) {} -InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, - const LocatedBlocksProto *blocks) - : fs_(fs), file_length_(blocks->filelength()) { - for (const auto &block : blocks->blocks()) { - blocks_.push_back(block); - } + void cancel() override { if (target_) target_->cancel(); } +private: + std::shared_ptr target_; +}; - if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) { - blocks_.push_back(blocks->lastblock()); - } } -void InputStreamImpl::PositionRead( - void *buf, size_t nbyte, uint64_t offset, - const std::set &excluded_datanodes, - const std::function - &handler) { - AsyncPreadSome(offset, asio::buffer(buf, nbyte), excluded_datanodes, handler); -} -} +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h index 5630934ed0105..a5a0446bca8dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -29,7 +29,9 @@ #include namespace hdfs { -namespace continuation { +namespace asio_continuation { + +using namespace continuation; template class ReadContinuation : public Continuation { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h index 08caf0ddbf78f..54caeed2eb2ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -33,7 +33,7 @@ namespace continuation { template struct ReadDelimitedPBMessageContinuation : public Continuation { - ReadDelimitedPBMessageContinuation(Stream *stream, + ReadDelimitedPBMessageContinuation(std::shared_ptr stream, ::google::protobuf::MessageLite *msg) : stream_(stream), msg_(msg) {} @@ -56,8 +56,8 @@ struct ReadDelimitedPBMessageContinuation : public Continuation { } next(status); }; - asio::async_read( - *stream_, asio::buffer(buf_), + asio::async_read(*stream_, + asio::buffer(buf_), std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, std::placeholders::_1, std::placeholders::_2), handler); @@ -82,14 +82,14 @@ struct ReadDelimitedPBMessageContinuation : public Continuation { return offset ? len + offset - transferred : 1; } - Stream *stream_; + std::shared_ptr stream_; ::google::protobuf::MessageLite *msg_; std::array buf_; }; template struct WriteDelimitedPBMessageContinuation : Continuation { - WriteDelimitedPBMessageContinuation(Stream *stream, + WriteDelimitedPBMessageContinuation(std::shared_ptr stream, const google::protobuf::MessageLite *msg) : stream_(stream), msg_(msg) {} @@ -101,28 +101,25 @@ struct WriteDelimitedPBMessageContinuation : Continuation { pbio::CodedOutputStream os(&ss); os.WriteVarint32(size); msg_->SerializeToCodedStream(&os); - write_coroutine_ = - std::shared_ptr(Write(stream_, asio::buffer(buf_))); - write_coroutine_->Run([next](const Status &stat) { next(stat); }); + asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } ); } private: - Stream *stream_; + std::shared_ptr stream_; const google::protobuf::MessageLite *msg_; std::string buf_; - std::shared_ptr write_coroutine_; }; template static inline Continuation * -ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { +ReadDelimitedPBMessage(std::shared_ptr stream, ::google::protobuf::MessageLite *msg) { return new ReadDelimitedPBMessageContinuation(stream, msg); } template static inline Continuation * -WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { +WriteDelimitedPBMessage(std::shared_ptr stream, ::google::protobuf::MessageLite *msg) { return new WriteDelimitedPBMessageContinuation(stream, msg); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index ff9f36c8891cf..b93ce9d69e780 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -20,7 +20,11 @@ #include "libhdfspp/status.h" + +#include + #include +#include #include #include @@ -53,6 +57,19 @@ static inline void ReadDelimitedPBMessage( std::string Base64Encode(const std::string &src); +static inline std::string GetRandomClientName() { + unsigned char buf[6] = { + 0, + }; + RAND_pseudo_bytes(buf, sizeof(buf)); + + std::stringstream ss; + ss << "libhdfs++_" + << Base64Encode(std::string(reinterpret_cast(buf), sizeof(buf))); + return ss.str(); +} + + } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt new file mode 100644 index 0000000000000..54ba96c6b825f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt @@ -0,0 +1,2 @@ +add_library(connection datanodeconnection.cc) +add_dependencies(connection proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc new file mode 100644 index 0000000000000..1b2cc3fd0288a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "datanodeconnection.h" +#include "common/util.h" + +namespace hdfs { + +void DataNodeConnectionImpl::Connect( + std::function dn)> handler) { + // Keep the DN from being freed until we're done + auto shared_this = shared_from_this(); + asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(), + [shared_this, handler](const asio::error_code &ec, std::array::iterator it) { + (void)it; + handler(ToStatus(ec), shared_this); }); +} + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h new file mode 100644 index 0000000000000..03bc81f200e66 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ +#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ + +#include "common/hdfs_public_api.h" +#include "common/async_stream.h" +#include "ClientNamenodeProtocol.pb.h" + +#include "asio.hpp" + +namespace hdfs { + +class DataNodeConnection : public AsyncStream { +public: + std::string uuid_; + std::unique_ptr token_; + + virtual void Connect(std::function dn)> handler) = 0; +}; + + +class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this{ +public: + std::unique_ptr conn_; + std::array endpoints_; + std::string uuid_; + + + DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + const hadoop::common::TokenProto *token) { + using namespace ::asio::ip; + + conn_.reset(new tcp::socket(*io_service)); + auto datanode_addr = dn_proto.id(); + endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()), + datanode_addr.xferport()); + uuid_ = dn_proto.id().datanodeuuid(); + + if (token) { + token_.reset(new hadoop::common::TokenProto()); + token_->CheckTypeAndMergeFrom(*token); + } + } + + void Connect(std::function dn)> handler) override; + + void async_read_some(const MutableBuffers &buf, + std::function handler) override { + conn_->async_read_some(buf, handler); + }; + + void async_write_some(const ConstBuffers &buf, + std::function handler) override { + conn_->async_write_some(buf, handler); + } + + void cancel() override { + // Can't portably use asio::cancel; see + // http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/basic_stream_socket/cancel/overload2.html + // + // When we implement connection re-use, we need to be sure to throw out + // connections on error + conn_->close(); + } +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt index f386688ab169b..8344022ead376 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -1,2 +1,2 @@ -add_library(fs filesystem.cc inputstream.cc) +add_library(fs filesystem.cc filehandle.cc) add_dependencies(fs proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc new file mode 100644 index 0000000000000..d93e82261385d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filehandle.h" +#include "common/continuation/continuation.h" +#include "connection/datanodeconnection.h" +#include "reader/block_reader.h" + +#include "ClientNamenodeProtocol.pb.h" + +#include + +namespace hdfs { + +using ::hadoop::hdfs::LocatedBlocksProto; + +FileHandle::~FileHandle() {} + +FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name, + const std::shared_ptr file_info) + : io_service_(io_service), client_name_(client_name), file_info_(file_info) { +} + + + + +CancelHandle FileHandleImpl::PositionRead( + void *buf, size_t nbyte, uint64_t offset, + const std::function + &handler) { + + // This is where retry and dead DN node elision will occur + + return AsyncPreadSome(offset, asio::buffer(buf, nbyte), std::set(), + [handler](const Status &status, const std::string &dn_id, size_t bytes_read){ + (void)dn_id; + handler(status, bytes_read); } + ); +} + +size_t FileHandleImpl::PositionRead(void *buf, size_t nbyte, off_t offset) { + auto stat = std::make_shared>(); + std::future future(stat->get_future()); + + /* wrap async call with promise/future to make it blocking */ + size_t read_count = 0; + auto callback = [stat, &read_count](const Status &s, size_t bytes) { + stat->set_value(s); + read_count = bytes; + }; + + PositionRead(buf, nbyte, offset, callback); + + /* wait for async to finish */ + auto s = future.get(); + + if (!s.ok()) { + return -1; + } + return (ssize_t)read_count; +} + +/* + * Note that this method must be thread-safe w.r.t. the unsafe operations occurring + * on the FileHandle + */ +CancelHandle FileHandleImpl::AsyncPreadSome( + size_t offset, const MutableBuffers &buffers, + const std::set &excluded_datanodes, + const std::function handler) { + using ::hadoop::hdfs::DatanodeInfoProto; + using ::hadoop::hdfs::LocatedBlockProto; + + auto it = std::find_if( + file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) { + return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); + }); + + if (it == file_info_->blocks_.end()) { + handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); + return CancelHandle(nullptr); + } + + ::hadoop::hdfs::LocatedBlockProto targetBlock = *it; + + const DatanodeInfoProto *chosen_dn = nullptr; + for (int i = 0; i < targetBlock.locs_size(); ++i) { + const auto &di = targetBlock.locs(i); + if (!excluded_datanodes.count(di.id().datanodeuuid())) { + chosen_dn = &di; + break; + } + } + + if (!chosen_dn) { + handler(Status::ResourceUnavailable("No datanodes available"), "", 0); + return CancelHandle(nullptr); + } + + uint64_t offset_within_block = offset - targetBlock.offset(); + uint64_t size_within_block = std::min( + targetBlock.b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + + // This is where we will put the logic for re-using a DN connection; we can + // steal the FileHandle's dn and put it back when we're done + std::shared_ptr dn = std::make_shared(io_service_, *chosen_dn, nullptr /*token*/); + std::string dn_id = dn->uuid_; + std::string client_name = client_name_; + + // Wrap the DN in a block reader to handle the state and logic of the + // block request protocol + std::shared_ptr reader; + reader.reset(new BlockReaderImpl(BlockReaderOptions(), dn)); + + + auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) { + handler(status, dn_id, transferred); + }; + + dn->Connect([handler,read_handler,targetBlock,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] + (Status status, std::shared_ptr dn) { + (void)dn; + if (status.ok()) { + reader->AsyncReadBlock( + client_name, targetBlock, offset_within_block, + asio::buffer(buffers, size_within_block), read_handler); + } else { + handler(status, dn_id, 0); + } + }); + + return CancelHandle(dn); +} + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h new file mode 100644 index 0000000000000..60f34ba1388be --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ +#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ + +#include "common/hdfs_public_api.h" +#include "common/async_stream.h" +#include "reader/fileinfo.h" + +#include "asio.hpp" + +#include + +namespace hdfs { + +/* + * FileHandle: coordinates operations on a particular file in HDFS + * + * Threading model: not thread-safe; consumers and io_service should not call + * concurrently. PositionRead and cancel() are the exceptions; they can be + * called concurrently and repeatedly. + * Lifetime: pointer returned to consumer by FileSystem::Open. Consumer is + * resonsible for freeing the object. + */ +class FileHandleImpl : public FileHandle { +public: + FileHandleImpl(::asio::io_service *io_service, const std::string &client_name, + const std::shared_ptr file_info); + + /* + * [Some day reliably] Reads a particular offset into the data file. + * On error, bytes_read returns the number of bytes successfully read; on + * success, bytes_read will equal nbyte + */ + CancelHandle PositionRead( + void *buf, + size_t nbyte, + uint64_t offset, + const std::function &handler + ) override; + size_t PositionRead(void *buf, size_t bytes_read, off_t offset) override; + + /* + * Reads some amount of data into the buffer. Will attempt to find the best + * datanode and read data from it. + * + * If an error occurs during connection or transfer, the callback will be + * called with bytes_read equal to the number of bytes successfully transferred. + * If no data nodes can be found, status will be Status::ResourceUnavailable. + * + */ + CancelHandle AsyncPreadSome(size_t offset, const MutableBuffers &buffers, + const std::set &excluded_datanodes, + const std::function handler); +private: + ::asio::io_service * const io_service_; + const std::string client_name_; + const std::shared_ptr file_info_; + // Shared pointer to the FileSystem's dead nodes object goes here +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 0b958a828e9b6..5f63aa136bb97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -22,7 +22,9 @@ #include +#include #include +#include namespace hdfs { @@ -32,37 +34,17 @@ static const int kNamenodeProtocolVersion = 1; using ::asio::ip::tcp; -FileSystem::~FileSystem() {} +/***************************************************************************** + * NAMENODE OPERATIONS + ****************************************************************************/ -void FileSystem::New( - IoService *io_service, const Options &options, const std::string &server, - const std::string &service, - const std::function &handler) { - FileSystemImpl *impl = new FileSystemImpl(io_service, options); - impl->Connect(server, service, [impl, handler](const Status &stat) { - if (stat.ok()) { - handler(stat, impl); - } else { - delete impl; - handler(stat, nullptr); - } - }); -} - -FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options) - : io_service_(static_cast(io_service)), - engine_(&io_service_->io_service(), options, - RpcEngine::GetRandomClientName(), kNamenodeProtocol, - kNamenodeProtocolVersion), - namenode_(&engine_) {} - -void FileSystemImpl::Connect(const std::string &server, +void NameNodeOperations::Connect(const std::string &server, const std::string &service, - std::function &&handler) { - using namespace continuation; + std::function &handler) { + using namespace asio_continuation; typedef std::vector State; auto m = Pipeline::Create(); - m->Push(Resolve(&io_service_->io_service(), server, service, + m->Push(Resolve(io_service_, server, service, std::back_inserter(m->state()))) .Push(Bind([this, m](const Continuation::Next &next) { engine_.Connect(m->state().front(), next); @@ -75,9 +57,9 @@ void FileSystemImpl::Connect(const std::string &server, }); } -void FileSystemImpl::Open( - const std::string &path, - const std::function &handler) { +void NameNodeOperations::GetBlockLocations(const std::string & path, + std::function)> handler) +{ using ::hadoop::hdfs::GetBlockLocationsRequestProto; using ::hadoop::hdfs::GetBlockLocationsResponseProto; @@ -98,9 +80,174 @@ void FileSystemImpl::Open( [this, s](const continuation::Continuation::Next &next) { namenode_.GetBlockLocations(&s->req, s->resp, next); })); + m->Run([this, handler](const Status &stat, const State &s) { - handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations()) + if (stat.ok()) { + auto file_info = std::make_shared(); + auto locations = s.resp->locations(); + + file_info->file_length_ = locations.filelength(); + + for (const auto &block : locations.blocks()) { + file_info->blocks_.push_back(block); + } + + if (locations.has_lastblock() && locations.lastblock().b().numbytes()) { + file_info->blocks_.push_back(locations.lastblock()); + } + + handler(stat, file_info); + } else { + handler(stat, nullptr); + } + }); +} + + +/***************************************************************************** + * FILESYSTEM BASE CLASS + ****************************************************************************/ + +void FileSystem::New( + IoService *io_service, const Options &options, const std::string &server, + const std::string &service, + const std::function &handler) { + FileSystemImpl *impl = new FileSystemImpl(io_service, options); + impl->Connect(server, service, [impl, handler](const Status &stat) { + if (stat.ok()) { + handler(stat, impl); + } else { + delete impl; + handler(stat, nullptr); + } + }); +} + +FileSystem * FileSystem::New( + IoService *io_service, const Options &options, const std::string &server, + const std::string &service) { + auto stat = std::make_shared>>(); + std::future> future = stat->get_future(); + + auto callback = [stat](const Status &s, FileSystem * fs) { + std::pair value(s, fs); + stat->set_value(value); + }; + + New(io_service, options, server, service, callback); + + /* block until promise is set */ + auto s = future.get(); + + if (s.first.ok()) + return s.second; + else + return nullptr; +} + +/***************************************************************************** + * FILESYSTEM IMPLEMENTATION + ****************************************************************************/ + +FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options) + : io_service_(static_cast(io_service)), + nn_(&io_service_->io_service(), options, + GetRandomClientName(), kNamenodeProtocol, + kNamenodeProtocolVersion), + client_name_(GetRandomClientName()) +{ + // Poor man's move + io_service = nullptr; + + /* spawn background threads for asio delegation */ + unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */; + for (unsigned int i = 0; i < threads; i++) { + AddWorkerThread(); + } +} + +FileSystemImpl::~FileSystemImpl() { + /** + * Note: IoService must be stopped before getting rid of worker threads. + * Once worker threads are joined and deleted the service can be deleted. + **/ + io_service_->Stop(); + worker_threads_.clear(); + io_service_.reset(nullptr); +} + +void FileSystemImpl::Connect(const std::string &server, + const std::string &service, + std::function &&handler) { + /* IoService::New can return nullptr */ + if (!io_service_) { + handler (Status::Error("Null IoService")); + } + nn_.Connect(server, service, handler); +} + +Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { + /* synchronized */ + auto stat = std::make_shared>(); + std::future future = stat->get_future(); + + auto callback = [stat](const Status &s) { + stat->set_value(s); + }; + + Connect(server, service, callback); + + /* block until promise is set */ + auto s = future.get(); + + return s; +} + + +int FileSystemImpl::AddWorkerThread() { + auto service_task = [](IoService *service) { service->Run(); }; + worker_threads_.push_back( + WorkerPtr(new std::thread(service_task, io_service_.get()))); + return worker_threads_.size(); +} + +void FileSystemImpl::Open( + const std::string &path, + const std::function &handler) { + + nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr file_info) { + handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info) : nullptr); }); } + +Status FileSystemImpl::Open(const std::string &path, + FileHandle **handle) { + auto stat = std::make_shared>(); + std::future future = stat->get_future(); + + /* wrap async FileSystem::Open with promise to make it a blocking call */ + FileHandle *input_stream = nullptr; + auto h = [stat, &input_stream](const Status &s, FileHandle *is) { + stat->set_value(s); + input_stream = is; + }; + + Open(path, h); + + /* block until promise is set */ + auto s = future.get(); + + if (!s.ok()) { + delete input_stream; + return s; + } + if (!input_stream) { + return s; + } + + *handle = input_stream; + return s; +} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 72f80b7b3456d..9740e9e26ecf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -18,61 +18,108 @@ #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ +#include "filehandle.h" #include "common/hdfs_public_api.h" +#include "common/async_stream.h" #include "libhdfspp/hdfs.h" #include "rpc/rpc_engine.h" +#include "reader/block_reader.h" +#include "reader/fileinfo.h" #include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.hrpc.inl" +#include "asio.hpp" + +#include + namespace hdfs { -class FileSystemImpl : public FileSystem { +/** + * NameNodeConnection: abstracts the details of communicating with a NameNode + * and the implementation of the communications protocol. + * + * Will eventually handle retry and failover. + * + * Threading model: thread-safe; all operations can be called concurrently + * Lifetime: owned by a FileSystemImpl + */ +class NameNodeOperations { public: - FileSystemImpl(IoService *io_service, const Options &options); - void Connect(const std::string &server, const std::string &service, - std::function &&handler); - virtual void Open(const std::string &path, - const std::function - &handler) override; - RpcEngine &rpc_engine() { return engine_; } + NameNodeOperations(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const char *protocol_name, + int protocol_version) : + io_service_(io_service), + engine_(io_service, options, client_name, protocol_name, protocol_version), + namenode_(& engine_) {} + + void Connect(const std::string &server, + const std::string &service, + std::function &handler); + + void GetBlockLocations(const std::string & path, + std::function)> handler); private: - IoServiceImpl *io_service_; + ::asio::io_service * io_service_; RpcEngine engine_; ClientNamenodeProtocol namenode_; }; -class InputStreamImpl : public InputStream { +/* + * FileSystem: The consumer's main point of interaction with the cluster as + * a whole. + * + * Initially constructed in a disconnected state; call Connect before operating + * on the FileSystem. + * + * All open files must be closed before the FileSystem is destroyed. + * + * Threading model: thread-safe for all operations + * Lifetime: pointer created for consumer who is responsible for deleting it + */ +class FileSystemImpl : public FileSystem { public: - InputStreamImpl(FileSystemImpl *fs, - const ::hadoop::hdfs::LocatedBlocksProto *blocks); - virtual void - PositionRead(void *buf, size_t nbyte, uint64_t offset, - const std::set &excluded_datanodes, - const std::function &handler) override; - template - void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, - const std::set &excluded_datanodes, - const Handler &handler); - template - void AsyncReadBlock(const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, - const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, - const MutableBufferSequence &buffers, - const Handler &handler); + FileSystemImpl(IoService *&io_service, const Options &options); + ~FileSystemImpl() override; + + /* attempt to connect to namenode, return bad status on failure */ + void Connect(const std::string &server, const std::string &service, + std::function &&handler); + /* attempt to connect to namenode, return bad status on failure */ + Status Connect(const std::string &server, const std::string &service); + + + virtual void Open(const std::string &path, + const std::function + &handler) override; + Status Open(const std::string &path, FileHandle **handle) override; + + + /* add a new thread to handle asio requests, return number of threads in pool + */ + int AddWorkerThread(); + + /* how many worker threads are servicing asio requests */ + int WorkerThreadCount() { return worker_threads_.size(); } + private: - FileSystemImpl *fs_; - unsigned long long file_length_; - std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; - template struct HandshakeContinuation; - template - struct ReadBlockContinuation; - struct RemoteBlockReaderTrait; + std::unique_ptr io_service_; + NameNodeOperations nn_; + const std::string client_name_; + + struct WorkerDeleter { + void operator()(std::thread *t) { + t->join(); + delete t; + } + }; + typedef std::unique_ptr WorkerPtr; + std::vector worker_threads_; + }; -} -#include "inputstream_impl.h" + +} #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h deleted file mode 100644 index 0d3b3027bb862..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef FS_INPUTSTREAM_IMPL_H_ -#define FS_INPUTSTREAM_IMPL_H_ - -#include "reader/block_reader.h" - -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include -#include -#include - -namespace hdfs { - -struct InputStreamImpl::RemoteBlockReaderTrait { - typedef RemoteBlockReader Reader; - struct State { - std::unique_ptr conn_; - std::shared_ptr reader_; - std::array endpoints_; - size_t transferred_; - Reader *reader() { return reader_.get(); } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; - static continuation::Pipeline * - CreatePipeline(::asio::io_service *io_service, - const ::hadoop::hdfs::DatanodeInfoProto &dn) { - using namespace ::asio::ip; - auto m = continuation::Pipeline::Create(); - auto &s = m->state(); - s.conn_.reset(new tcp::socket(*io_service)); - s.reader_ = std::make_shared(BlockReaderOptions(), s.conn_.get()); - auto datanode = dn.id(); - s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), - datanode.xferport()); - - m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), - s.endpoints_.end())); - return m; - } -}; - -template -struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { - HandshakeContinuation(Reader *reader, const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset) - : reader_(reader), client_name_(client_name), length_(length), - offset_(offset) { - if (token) { - token_.reset(new hadoop::common::TokenProto()); - token_->CheckTypeAndMergeFrom(*token); - } - block_.CheckTypeAndMergeFrom(*block); - } - - virtual void Run(const Next &next) override { - reader_->async_connect(client_name_, token_.get(), &block_, length_, - offset_, next); - } - -private: - Reader *reader_; - const std::string client_name_; - std::unique_ptr token_; - hadoop::hdfs::ExtendedBlockProto block_; - uint64_t length_; - uint64_t offset_; -}; - -template -struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { - ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, - size_t *transferred) - : reader_(reader), buffer_(buffer), - buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { - static_assert(!std::is_reference::value, - "Buffer must not be a reference type"); - } - - virtual void Run(const Next &next) override { - *transferred_ = 0; - next_ = next; - OnReadData(Status::OK(), 0); - } - -private: - Reader *reader_; - const MutableBufferSequence buffer_; - const size_t buffer_size_; - size_t *transferred_; - std::function next_; - - void OnReadData(const Status &status, size_t transferred) { - using std::placeholders::_1; - using std::placeholders::_2; - *transferred_ += transferred; - if (!status.ok()) { - next_(status); - } else if (*transferred_ >= buffer_size_) { - next_(status); - } else { - reader_->async_read_some( - asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), - std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); - } - } -}; - -template -void InputStreamImpl::AsyncPreadSome( - size_t offset, const MutableBufferSequence &buffers, - const std::set &excluded_datanodes, const Handler &handler) { - using ::hadoop::hdfs::DatanodeInfoProto; - using ::hadoop::hdfs::LocatedBlockProto; - - auto it = std::find_if( - blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { - return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); - }); - - if (it == blocks_.end()) { - handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); - return; - } - - const DatanodeInfoProto *chosen_dn = nullptr; - for (int i = 0; i < it->locs_size(); ++i) { - const auto &di = it->locs(i); - if (!excluded_datanodes.count(di.id().datanodeuuid())) { - chosen_dn = &di; - break; - } - } - - if (!chosen_dn) { - handler(Status::ResourceUnavailable("No datanodes available"), "", 0); - return; - } - - uint64_t offset_within_block = offset - it->offset(); - uint64_t size_within_block = std::min( - it->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); - - AsyncReadBlock( - fs_->rpc_engine().client_name(), *it, *chosen_dn, offset_within_block, - asio::buffer(buffers, size_within_block), handler); -} - -template -void InputStreamImpl::AsyncReadBlock( - const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, - const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, - const MutableBufferSequence &buffers, const Handler &handler) { - - typedef typename BlockReaderTrait::Reader Reader; - auto m = - BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn); - auto &s = m->state(); - size_t size = asio::buffer_size(buffers); - m->Push(new HandshakeContinuation(s.reader(), client_name, nullptr, - &block.b(), size, offset)) - .Push(new ReadBlockContinuation( - s.reader(), buffers, s.transferred())); - const std::string &dnid = dn.id().datanodeuuid(); - m->Run([handler, dnid](const Status &status, - const typename BlockReaderTrait::State &state) { - handler(status, dnid, *state.transferred()); - }); -} -} - -#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt index 71e28ac619901..0dcae29e88d18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt @@ -16,5 +16,5 @@ # limitations under the License. # -add_library(reader remote_block_reader.cc datatransfer.cc) +add_library(reader block_reader.cc datatransfer.cc) add_dependencies(reader proto) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc similarity index 60% rename from hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h rename to hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index 35c2ce46ea9e5..a4e21de11baeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -15,18 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ -#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ - -#include "datatransfer.h" +#include "reader/block_reader.h" +#include "reader/datatransfer.h" +#include "common/continuation/continuation.h" #include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include -#include -#include - -#include #include @@ -36,14 +28,31 @@ hadoop::hdfs::OpReadBlockProto ReadBlockProto(const std::string &client_name, bool verify_checksum, const hadoop::common::TokenProto *token, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); + uint64_t offset) { + using namespace hadoop::hdfs; + using namespace hadoop::common; + BaseHeaderProto *base_h = new BaseHeaderProto(); + base_h->set_allocated_block(new ExtendedBlockProto(*block)); + if (token) { + base_h->set_allocated_token(new TokenProto(*token)); + } + ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); + h->set_clientname(client_name); + h->set_allocated_baseheader(base_h); + + OpReadBlockProto p; + p.set_allocated_header(h); + p.set_offset(offset); + p.set_len(length); + p.set_sendchecksums(verify_checksum); + // TODO: p.set_allocated_cachingstrategy(); + return p; +} -template -template -void RemoteBlockReader::async_connect( - const std::string &client_name, const hadoop::common::TokenProto *token, +void BlockReaderImpl::AsyncRequestBlock( + const std::string &client_name, const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset, const ConnectHandler &handler) { + uint64_t offset, const std::function &handler) { // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect @@ -62,18 +71,17 @@ void RemoteBlockReader::async_connect( s->header.insert(s->header.begin(), {0, kDataTransferVersion, Operation::kReadBlock}); s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum, - token, block, length, offset)); + dn_->token_.get(), block, length, offset)); auto read_pb_message = - new continuation::ReadDelimitedPBMessageContinuation( - stream_, &s->response); + new continuation::ReadDelimitedPBMessageContinuation( + dn_, &s->response); - m->Push(continuation::Write(stream_, asio::buffer(s->header))) - .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request)) + m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header))) + .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request)) .Push(read_pb_message); - m->Run([this, handler, offset](const Status &status, const State &s) { - Status stat = status; + m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; if (stat.ok()) { const auto &resp = s.response; if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { @@ -90,10 +98,26 @@ void RemoteBlockReader::async_connect( }); } -template -struct RemoteBlockReader::ReadPacketHeader +Status BlockReaderImpl::RequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset) { + auto stat = std::make_shared>(); + std::future future(stat->get_future()); + AsyncRequestBlock(client_name, block, length, offset, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +hadoop::hdfs::OpReadBlockProto +ReadBlockProto(const std::string &client_name, bool verify_checksum, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset); + +struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation { - ReadPacketHeader(RemoteBlockReader *parent) : parent_(parent) {} + ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {} virtual void Run(const Next &next) override { parent_->packet_data_read_bytes_ = 0; @@ -113,7 +137,7 @@ struct RemoteBlockReader::ReadPacketHeader next(status); }; - asio::async_read(*parent_->stream_, asio::buffer(buf_), + asio::async_read(*parent_->dn_, asio::buffer(buf_), std::bind(&ReadPacketHeader::CompletionHandler, this, std::placeholders::_1, std::placeholders::_2), handler); @@ -127,7 +151,7 @@ struct RemoteBlockReader::ReadPacketHeader static const size_t kHeaderLenSize = sizeof(int16_t); static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; - RemoteBlockReader *parent_; + BlockReaderImpl *parent_; std::array buf_; size_t packet_length() const { @@ -149,9 +173,8 @@ struct RemoteBlockReader::ReadPacketHeader } }; -template -struct RemoteBlockReader::ReadChecksum : continuation::Continuation { - ReadChecksum(RemoteBlockReader *parent) : parent_(parent) {} +struct BlockReaderImpl::ReadChecksum : continuation::Continuation { + ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {} virtual void Run(const Next &next) override { auto parent = parent_; @@ -172,20 +195,58 @@ struct RemoteBlockReader::ReadChecksum : continuation::Continuation { }; parent->checksum_.resize(parent->packet_len_ - sizeof(int) - parent->header_.datalen()); - asio::async_read(*parent->stream_, asio::buffer(parent->checksum_), - handler); + asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler); + } + +private: + BlockReaderImpl *parent_; +}; + +struct BlockReaderImpl::ReadData : continuation::Continuation { + ReadData(BlockReaderImpl *parent, + std::shared_ptr bytes_transferred, + const asio::mutable_buffers_1 &buf) + : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) { + buf_.begin(); + } + + ~ReadData() { + buf_.end(); + } + + virtual void Run(const Next &next) override { + auto handler = + [next, this](const asio::error_code &ec, size_t transferred) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } + *bytes_transferred_ += transferred; + parent_->bytes_to_read_ -= transferred; + parent_->packet_data_read_bytes_ += transferred; + if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { + parent_->state_ = kReadPacketHeader; + } + next(status); + }; + + auto data_len = + parent_->header_.datalen() - parent_->packet_data_read_bytes_; + asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), + handler); } private: - RemoteBlockReader *parent_; + BlockReaderImpl *parent_; + std::shared_ptr bytes_transferred_; + const asio::mutable_buffers_1 buf_; }; -template -struct RemoteBlockReader::ReadPadding : continuation::Continuation { - ReadPadding(RemoteBlockReader *parent) +struct BlockReaderImpl::ReadPadding : continuation::Continuation { + ReadPadding(BlockReaderImpl *parent) : parent_(parent), padding_(parent->chunk_padding_bytes_), bytes_transferred_(std::make_shared(0)), - read_data_(new ReadData( + read_data_(new ReadData( parent, bytes_transferred_, asio::buffer(padding_))) {} virtual void Run(const Next &next) override { @@ -207,7 +268,7 @@ struct RemoteBlockReader::ReadPadding : continuation::Continuation { } private: - RemoteBlockReader *parent_; + BlockReaderImpl *parent_; std::vector padding_; std::shared_ptr bytes_transferred_; std::shared_ptr read_data_; @@ -215,45 +276,9 @@ struct RemoteBlockReader::ReadPadding : continuation::Continuation { ReadPadding &operator=(const ReadPadding &) = delete; }; -template -template -struct RemoteBlockReader::ReadData : continuation::Continuation { - ReadData(RemoteBlockReader *parent, - std::shared_ptr bytes_transferred, - const MutableBufferSequence &buf) - : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {} - virtual void Run(const Next &next) override { - auto handler = - [next, this](const asio::error_code &ec, size_t transferred) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } - *bytes_transferred_ += transferred; - parent_->bytes_to_read_ -= transferred; - parent_->packet_data_read_bytes_ += transferred; - if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { - parent_->state_ = kReadPacketHeader; - } - next(status); - }; - - auto data_len = - parent_->header_.datalen() - parent_->packet_data_read_bytes_; - async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len), - handler); - } - -private: - RemoteBlockReader *parent_; - std::shared_ptr bytes_transferred_; - MutableBufferSequence buf_; -}; - -template -struct RemoteBlockReader::AckRead : continuation::Continuation { - AckRead(RemoteBlockReader *parent) : parent_(parent) {} +struct BlockReaderImpl::AckRead : continuation::Continuation { + AckRead(BlockReaderImpl *parent) : parent_(parent) {} virtual void Run(const Next &next) override { if (parent_->bytes_to_read_ > 0) { @@ -268,25 +293,24 @@ struct RemoteBlockReader::AckRead : continuation::Continuation { : hadoop::hdfs::Status::SUCCESS); m->Push( - continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state())); + continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); m->Run([this, next](const Status &status, const hadoop::hdfs::ClientReadStatusProto &) { if (status.ok()) { - parent_->state_ = RemoteBlockReader::kFinished; + parent_->state_ = BlockReaderImpl::kFinished; } next(status); }); } private: - RemoteBlockReader *parent_; + BlockReaderImpl *parent_; }; -template -template -void RemoteBlockReader::async_read_some( - const MutableBufferSequence &buffers, const ReadHandler &handler) { +void BlockReaderImpl::AsyncReadPacket( + const MutableBuffers &buffers, + const std::function &handler) { assert(state_ != kOpen && "Not connected"); struct State { @@ -298,7 +322,7 @@ void RemoteBlockReader::async_read_some( m->Push(new ReadPacketHeader(this)) .Push(new ReadChecksum(this)) .Push(new ReadPadding(this)) - .Push(new ReadData( + .Push(new ReadData( this, m->state().bytes_transferred, buffers)) .Push(new AckRead(this)); @@ -308,15 +332,14 @@ void RemoteBlockReader::async_read_some( }); } -template -template + size_t -RemoteBlockReader::read_some(const MutableBufferSequence &buffers, +BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, Status *status) { size_t transferred = 0; auto done = std::make_shared>(); auto future = done->get_future(); - async_read_some(buffers, + AsyncReadPacket(buffers, [status, &transferred, done](const Status &stat, size_t t) { *status = stat; transferred = t; @@ -326,17 +349,85 @@ RemoteBlockReader::read_some(const MutableBufferSequence &buffers, return transferred; } -template -Status RemoteBlockReader::connect( - const std::string &client_name, const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { - auto stat = std::make_shared>(); - std::future future(stat->get_future()); - async_connect(client_name, token, block, length, offset, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} + +struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation { + RequestBlockContinuation(BlockReader *reader, const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) + : reader_(reader), client_name_(client_name), length_(length), + offset_(offset) { + block_.CheckTypeAndMergeFrom(*block); + } + + virtual void Run(const Next &next) override { + reader_->AsyncRequestBlock(client_name_, &block_, length_, + offset_, next); + } + +private: + BlockReader *reader_; + const std::string client_name_; + hadoop::hdfs::ExtendedBlockProto block_; + uint64_t length_; + uint64_t offset_; +}; + +struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation { + ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, + size_t *transferred) + : reader_(reader), buffer_(buffer), + buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { + } + + virtual void Run(const Next &next) override { + *transferred_ = 0; + next_ = next; + OnReadData(Status::OK(), 0); + } + +private: + BlockReader *reader_; + const MutableBuffers buffer_; + const size_t buffer_size_; + size_t *transferred_; + std::function next_; + + void OnReadData(const Status &status, size_t transferred) { + using std::placeholders::_1; + using std::placeholders::_2; + *transferred_ += transferred; + if (!status.ok()) { + next_(status); + } else if (*transferred_ >= buffer_size_) { + next_(status); + } else { + reader_->AsyncReadPacket( + asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), + std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); + } + } +}; + +void BlockReaderImpl::AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, + size_t offset, + const MutableBuffers &buffers, + const std::function handler) { + + auto m = continuation::Pipeline::Create(); + size_t * bytesTransferred = &m->state(); + + size_t size = asio::buffer_size(buffers); + + m->Push(new RequestBlockContinuation(this, client_name, + &block.b(), size, offset)) + .Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); + + m->Run([handler] (const Status &status, + const size_t totalBytesTransferred) { + handler(status, totalBytesTransferred); + }); } -#endif +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index 81636b9bfff4f..140286b9fcdbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -19,7 +19,9 @@ #define BLOCK_READER_H_ #include "libhdfspp/status.h" +#include "common/async_stream.h" #include "datatransfer.pb.h" +#include "connection/datanodeconnection.h" #include @@ -55,38 +57,73 @@ struct BlockReaderOptions { : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {} }; -template -class RemoteBlockReader - : public std::enable_shared_from_this> { +/** + * Handles the operational state of request and reading a block (or portion of + * a block) from a DataNode. + * + * Threading model: not thread-safe. + * Lifecycle: should be created, used for a single read, then freed. + */ +class BlockReader { public: - explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream) - : stream_(stream), state_(kOpen), options_(options), + virtual void AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function handler) = 0; + + virtual void AsyncReadPacket( + const MutableBuffers &buffers, + const std::function &handler) = 0; + + virtual void AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset, + const std::function &handler) = 0; +}; + +class BlockReaderImpl + : public BlockReader, public std::enable_shared_from_this { +public: + explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr dn) + : dn_(dn), state_(kOpen), options_(options), chunk_padding_bytes_(0) {} - template - void async_read_some(const MutableBufferSequence &buffers, - const ReadHandler &handler); + virtual void AsyncReadPacket( + const MutableBuffers &buffers, + const std::function &handler) override; + + virtual void AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset, + const std::function &handler) override; - template - size_t read_some(const MutableBufferSequence &buffers, Status *status); + virtual void AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function handler) override; - Status connect(const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); + size_t ReadPacket(const MutableBuffers &buffers, Status *status); - template - void async_connect(const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset, - const ConnectHandler &handler); + Status RequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset); private: + struct RequestBlockContinuation; + struct ReadBlockContinuation; + struct ReadPacketHeader; struct ReadChecksum; struct ReadPadding; - template struct ReadData; + struct ReadData; struct AckRead; enum State { kOpen, @@ -97,7 +134,7 @@ class RemoteBlockReader kFinished, }; - Stream *stream_; + std::shared_ptr dn_; hadoop::hdfs::PacketHeaderProto header_; State state_; BlockReaderOptions options_; @@ -109,6 +146,4 @@ class RemoteBlockReader }; } -#include "remote_block_reader_impl.h" - #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h index 511c2eb9e208a..c70a075f6e6b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h @@ -19,6 +19,10 @@ #define LIB_READER_DATA_TRANSFER_H_ #include "common/sasl_authenticator.h" +#include "common/async_stream.h" +#include "connection/datanodeconnection.h" +#include + namespace hdfs { @@ -32,26 +36,34 @@ enum Operation { kReadBlock = 81, }; -template class DataTransferSaslStream { +template class DataTransferSaslStream : public DataNodeConnection { public: - DataTransferSaslStream(Stream *stream, const std::string &username, + DataTransferSaslStream(std::shared_ptr stream, const std::string &username, const std::string &password) : stream_(stream), authenticator_(username, password) {} template void Handshake(const Handler &next); - template - void async_read_some(const MutableBufferSequence &buffers, - ReadHandler &&handler); + void async_read_some(const MutableBuffers &buf, + std::function handler) override { + stream_->async_read_some(buf, handler); + } + + void async_write_some(const ConstBuffers &buf, + std::function handler) override { + stream_->async_write_some(buf, handler); + } - template - void async_write_some(const ConstBufferSequence &buffers, - WriteHandler &&handler); + void cancel() override { stream_->cancel(); } + void Connect(std::function dn)> handler) override + {(void)handler; /*TODO: Handshaking goes here*/}; private: DataTransferSaslStream(const DataTransferSaslStream &) = delete; DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; - Stream *stream_; + std::shared_ptr stream_; DigestMD5Authenticator authenticator_; struct ReadSaslMessage; struct Authenticator; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h index 088b86e5dc6ff..3ca16e958165a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h @@ -70,7 +70,7 @@ struct DataTransferSaslStream::Authenticator template struct DataTransferSaslStream::ReadSaslMessage : continuation::Continuation { - ReadSaslMessage(Stream *stream, std::string *data) + ReadSaslMessage(std::shared_ptr stream, std::string *data) : stream_(stream), data_(data), read_pb_(stream, &resp_) {} virtual void Run(const Next &next) override { @@ -87,7 +87,7 @@ struct DataTransferSaslStream::ReadSaslMessage } private: - Stream *stream_; + std::shared_ptr stream_; std::string *data_; hadoop::hdfs::DataTransferEncryptorMessageProto resp_; continuation::ReadDelimitedPBMessageContinuation read_pb_; @@ -97,7 +97,7 @@ template template void DataTransferSaslStream::Handshake(const Handler &next) { using ::hadoop::hdfs::DataTransferEncryptorMessageProto; - using ::hdfs::continuation::Write; + using ::hdfs::asio_continuation::Write; using ::hdfs::continuation::WriteDelimitedPBMessage; static const int kMagicNumber = htonl(kDataTransferSasl); @@ -109,7 +109,7 @@ void DataTransferSaslStream::Handshake(const Handler &next) { std::string resp0; DataTransferEncryptorMessageProto req1; std::string resp1; - Stream *stream; + std::shared_ptr stream; }; auto m = continuation::Pipeline::Create(); State *s = &m->state(); @@ -117,7 +117,7 @@ void DataTransferSaslStream::Handshake(const Handler &next) { DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); - m->Push(Write(stream_, kMagicNumberBuffer)) + m->Push(Write(stream_.get(), kMagicNumberBuffer)) .Push(WriteDelimitedPBMessage(stream_, &s->req0)) .Push(new ReadSaslMessage(stream_, &s->resp0)) .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1)) @@ -126,19 +126,6 @@ void DataTransferSaslStream::Handshake(const Handler &next) { m->Run([next](const Status &status, const State &) { next(status); }); } -template -template -void DataTransferSaslStream::async_read_some( - const MutableBufferSequence &buffers, ReadHandler &&handler) { - stream_->async_read_some(buffers, handler); -} - -template -template -void DataTransferSaslStream::async_write_some( - const ConstBufferSequence &buffers, WriteHandler &&handler) { - stream_->async_write_some(buffers, handler); -} } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h new file mode 100644 index 0000000000000..ad10165d5e546 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_FILEINFO_H_ +#define LIB_READER_FILEINFO_H_ + +#include "ClientNamenodeProtocol.pb.h" + +namespace hdfs { + +/** + * Information that is assumed to be unchanging about a file for the duration of + * the operations. + */ +struct FileInfo { + unsigned long long file_length_; + std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc deleted file mode 100644 index 68bc4ee3376ee..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "block_reader.h" - -namespace hdfs { - -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { - using namespace hadoop::hdfs; - using namespace hadoop::common; - BaseHeaderProto *base_h = new BaseHeaderProto(); - base_h->set_allocated_block(new ExtendedBlockProto(*block)); - if (token) { - base_h->set_allocated_token(new TokenProto(*token)); - } - ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); - h->set_clientname(client_name); - h->set_allocated_baseheader(base_h); - - OpReadBlockProto p; - p.set_allocated_header(h); - p.set_offset(offset); - p.set_len(length); - p.set_sendchecksums(verify_checksum); - // TODO: p.set_allocated_cachingstrategy(); - return p; -} -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 83721a76c5dbb..29d455fa65293 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -19,9 +19,6 @@ #include "rpc_connection.h" #include "common/util.h" -#include - -#include #include namespace hdfs { @@ -83,15 +80,4 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, return future.get(); } -std::string RpcEngine::GetRandomClientName() { - unsigned char buf[6] = { - 0, - }; - RAND_pseudo_bytes(buf, sizeof(buf)); - - std::stringstream ss; - ss << "libhdfs++_" - << Base64Encode(std::string(reinterpret_cast(buf), sizeof(buf))); - return ss.str(); -} } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index eca878e1d249a..7615ba0394f27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -26,7 +26,7 @@ protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS ) add_executable(remote_block_reader_test remote_block_reader_test.cc $) -target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(remote_block_reader_test reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(remote_block_reader remote_block_reader_test) add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc) @@ -34,10 +34,10 @@ target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_mai add_test(sasl_digest_md5 sasl_digest_md5_test) add_executable(inputstream_test inputstream_test.cc) -target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(inputstream_test fs rpc reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(inputstream inputstream_test) include_directories(${CMAKE_CURRENT_BINARY_DIR}) add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $) -target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(rpc_engine_test rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(rpc_engine rpc_engine_test) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc index aa95256b7e1fe..c6f2c20e0f9a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc @@ -17,7 +17,10 @@ */ #include "fs/filesystem.h" +#include "common/util.h" + #include +#include using hadoop::common::TokenProto; using hadoop::hdfs::DatanodeInfoProto; @@ -34,158 +37,40 @@ using namespace hdfs; namespace hdfs { -class MockReader { +class MockReader : public BlockReader { public: - virtual ~MockReader() {} MOCK_METHOD2( - async_read_some, + AsyncReadPacket, void(const asio::mutable_buffers_1 &, const std::function &)); - MOCK_METHOD6(async_connect, - void(const std::string &, TokenProto *, ExtendedBlockProto *, - uint64_t, uint64_t, - const std::function &)); -}; - -template struct MockBlockReaderTrait { - typedef MockReader Reader; - struct State { - MockReader reader_; - size_t transferred_; - Reader *reader() { return &reader_; } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; - - static continuation::Pipeline * - CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) { - auto m = continuation::Pipeline::Create(); - *m->state().transferred() = 0; - Trait::InitializeMockReader(m->state().reader()); - return m; - } + MOCK_METHOD5(AsyncRequestBlock, + void(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset, + const std::function &handler)); + + MOCK_METHOD5(AsyncReadBlock, void( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function handler)); }; -} - -TEST(InputStreamTest, TestReadSingleTrunk) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); - } - }; - is.AsyncReadBlock>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(sizeof(buf), read); - read = 0; } -TEST(InputStreamTest, TestReadMultipleTrunk) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .Times(4) - .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); - } - }; - - is.AsyncReadBlock>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(sizeof(buf), read); - read = 0; -} - -TEST(InputStreamTest, TestReadError) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); - } - }; - - is.AsyncReadBlock>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_FALSE(stat.ok()); - ASSERT_EQ(sizeof(buf) / 4 * 3, read); - read = 0; -} - -TEST(InputStreamTest, TestExcludeDataNode) { - LocatedBlocksProto blocks; - LocatedBlockProto *block = blocks.add_blocks(); - ExtendedBlockProto *b = block->mutable_b(); +TEST(FileHandleTest, TestExcludeDataNode) { + auto file_info = std::make_shared(); + file_info->blocks_.push_back(LocatedBlockProto()); + LocatedBlockProto & block = file_info->blocks_[0]; + ExtendedBlockProto *b = block.mutable_b(); b->set_poolid(""); b->set_blockid(1); b->set_generationstamp(1); b->set_numbytes(4096); - DatanodeInfoProto *di = block->add_locs(); + // Set up the one block to have one datanode holding it + DatanodeInfoProto *di = block.add_locs(); DatanodeIDProto *dnid = di->mutable_id(); dnid->set_datanodeuuid("foo"); @@ -193,28 +78,19 @@ TEST(InputStreamTest, TestExcludeDataNode) { 0, }; IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); + FileHandleImpl is(&io_service.io_service(), GetRandomClientName(), file_info); Status stat; size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); - } - }; - + // Exclude the one datanode with the data std::set excluded_dn({"foo"}); is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn, [&stat, &read](const Status &status, const std::string &, size_t transferred) { stat = status; read = transferred; }); + + // Should fail with no resource available ASSERT_EQ(static_cast(std::errc::resource_unavailable_try_again), stat.code()); ASSERT_EQ(0UL, read); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h index 8c0ef8cf3a019..4c15375205d69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -18,6 +18,8 @@ #ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_ #define LIBHDFSPP_TEST_MOCK_CONNECTION_H_ +#include "common/async_stream.h" + #include #include #include @@ -27,13 +29,15 @@ namespace hdfs { -class MockConnectionBase { +class MockConnectionBase : public AsyncStream{ public: MockConnectionBase(::asio::io_service *io_service); virtual ~MockConnectionBase(); typedef std::pair ProducerResult; - template - void async_read_some(const MutableBufferSequence &buf, Handler &&handler) { + + void async_read_some(const MutableBuffers &buf, + std::function handler) override { if (produced_.size() == 0) { ProducerResult r = Produce(); if (r.first) { @@ -51,8 +55,9 @@ class MockConnectionBase { io_service_->post(std::bind(handler, asio::error_code(), len)); } - template - void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { + void async_write_some(const ConstBuffers &buf, + std::function handler) override { // CompletionResult res = OnWrite(buf); io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf))); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc index 6ae657ce91d97..80b50af44d4cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -21,6 +21,8 @@ #include "datatransfer.pb.h" #include "common/util.h" #include "reader/block_reader.h" +#include "reader/datatransfer.h" +#include "reader/fileinfo.h" #include #include @@ -36,10 +38,14 @@ using ::hadoop::hdfs::DataTransferEncryptorMessageProto; using ::hadoop::hdfs::ExtendedBlockProto; using ::hadoop::hdfs::PacketHeaderProto; using ::hadoop::hdfs::ReadOpChecksumInfoProto; +using ::hadoop::hdfs::LocatedBlockProto; +using ::hadoop::hdfs::LocatedBlocksProto; using ::asio::buffer; using ::asio::error_code; using ::asio::mutable_buffers_1; +using ::testing::_; +using ::testing::InvokeArgument; using ::testing::Return; using std::make_pair; using std::string; @@ -49,12 +55,49 @@ namespace pbio = pb::io; namespace hdfs { -class MockDNConnection : public MockConnectionBase { +class MockDNConnection : public MockConnectionBase, public DataNodeConnection{ public: MockDNConnection(::asio::io_service &io_service) : MockConnectionBase(&io_service) {} MOCK_METHOD0(Produce, ProducerResult()); + + MOCK_METHOD1(Connect, void(std::function dn)>)); + + void async_read_some(const MutableBuffers &buf, + std::function handler) override { + this->MockConnectionBase::async_read_some(buf, handler); + } + + void async_write_some(const ConstBuffers &buf, + std::function handler) override { + this->MockConnectionBase::async_write_some(buf, handler); + } + + void cancel() override {}; +}; + +// Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we +// can test the logic of AsyncReadBlock +class PartialMockReader : public BlockReaderImpl { +public: + PartialMockReader() : + BlockReaderImpl(BlockReaderOptions(), std::shared_ptr()) {}; + + MOCK_METHOD2( + AsyncReadPacket, + void(const asio::mutable_buffers_1 &, + const std::function &)); + + MOCK_METHOD5(AsyncRequestBlock, + void(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset, + const std::function &handler)); }; + + } static inline string ToDelimitedString(const pb::MessageLite *msg) { @@ -93,20 +136,102 @@ ProducePacket(const std::string &data, const std::string &checksum, return std::make_pair(error_code(), std::move(payload)); } +TEST(RemoteBlockReaderTest, TestReadSingleTrunk) { + auto file_info = std::make_shared(); + LocatedBlocksProto blocks; + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + + Status stat; + size_t read = 0; + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) { + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + Status stat; + size_t read = 0; + + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .Times(4) + .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(RemoteBlockReaderTest, TestReadError) { + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + Status stat; + size_t read = 0; + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_FALSE(stat.ok()); + ASSERT_EQ(sizeof(buf) / 4 * 3, read); + read = 0; +} + template -static std::shared_ptr> -ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block, +static std::shared_ptr +ReadContent(std::shared_ptr conn, const ExtendedBlockProto &block, uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, const Handler &handler) { BlockReaderOptions options; - auto reader = std::make_shared>(options, conn); + auto reader = std::make_shared(options, conn); Status result; - reader->async_connect("libhdfs++", token, &block, length, offset, + reader->AsyncRequestBlock("libhdfs++", &block, length, offset, [buf, reader, handler](const Status &stat) { if (!stat.ok()) { handler(stat, 0); } else { - reader->async_read_some(buf, handler); + reader->AsyncReadPacket(buf, handler); } }); return reader; @@ -116,11 +241,11 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) { static const size_t kChunkSize = 512; static const string kChunkData(kChunkSize, 'a'); ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); @@ -128,17 +253,20 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) { block.set_poolid("foo"); block.set_blockid(0); block.set_generationstamp(0); - + + bool done = false; std::string data(kChunkSize, 0); - ReadContent(&conn, nullptr, block, kChunkSize, 0, + ReadContent(conn, block, kChunkSize, 0, buffer(const_cast(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { + [&data, &io_service, &done](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkData, data); + done = true; io_service.stop(); }); io_service.run(); + ASSERT_TRUE(done); } TEST(RemoteBlockReaderTest, TestReadWithinChunk) { @@ -148,7 +276,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared(io_service); BlockOpResponseProto block_op_resp; ReadOpChecksumInfoProto *checksum_info = block_op_resp.mutable_readopchecksuminfo(); @@ -158,7 +286,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { checksum->set_bytesperchecksum(512); block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); @@ -167,16 +295,20 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { block.set_blockid(0); block.set_generationstamp(0); + bool done = false; + string data(kLength, 0); - ReadContent(&conn, nullptr, block, data.size(), kOffset, + ReadContent(conn, block, data.size(), kOffset, buffer(const_cast(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { + [&data, &io_service,&done](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kLength, transferred); ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); + done = true; io_service.stop(); }); io_service.run(); + ASSERT_TRUE(done); } TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { @@ -184,11 +316,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { static const string kChunkData(kChunkSize, 'a'); ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); @@ -201,13 +333,13 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { string data(kChunkSize, 0); mutable_buffers_1 buf = buffer(const_cast(data.c_str()), data.size()); BlockReaderOptions options; - auto reader = std::make_shared >(options, &conn); + auto reader = std::make_shared(options, conn); Status result; - reader->async_connect( - "libhdfs++", nullptr, &block, data.size(), 0, + reader->AsyncRequestBlock( + "libhdfs++", &block, data.size(), 0, [buf, reader, &data, &io_service](const Status &stat) { ASSERT_TRUE(stat.ok()); - reader->async_read_some( + reader->AsyncReadPacket( buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kChunkSize, transferred); @@ -215,7 +347,7 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { data.clear(); data.resize(kChunkSize); transferred = 0; - reader->async_read_some( + reader->AsyncReadPacket( buf, [&data,&io_service](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kChunkSize, transferred); @@ -234,7 +366,7 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) { "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," "charset=utf-8,algorithm=md5-sess"; ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); @@ -247,23 +379,23 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) { ::hadoop::hdfs:: DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0)))) .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1)))) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); - DataTransferSaslStream sasl_conn(&conn, "foo", "bar"); + auto sasl_conn = std::make_shared >(conn, "foo", "bar"); ExtendedBlockProto block; block.set_poolid("foo"); block.set_blockid(0); block.set_generationstamp(0); std::string data(kChunkSize, 0); - sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service]( + sasl_conn->Handshake([sasl_conn, &block, &data, &io_service]( const Status &s) { ASSERT_TRUE(s.ok()); - ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, + ReadContent(sasl_conn, block, kChunkSize, 0, buffer(const_cast(data.c_str()), data.size()), [&data, &io_service](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok());