Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
1fb1ea5
Refactored NameNodeConnection
Oct 22, 2015
c6cf517
Removed fs_ from InputStream
Oct 22, 2015
8b8190d
Moved GetBlockInfo to NN connection
Oct 22, 2015
108b54f
Moved GetBlockLocations to std::function
Oct 22, 2015
6d112a1
Added comments
Oct 22, 2015
e57b0ed
Stripped whitespace
Oct 22, 2015
c9c8212
Renamed NameNodeConnection to NameNodeOperations
Oct 27, 2015
01499b6
Renamed input_stream and asio_continuation
Oct 27, 2015
02c6783
Renamed CreatePipeline to Connect
Oct 27, 2015
5d28d02
Rename async_connect to async_request
Oct 27, 2015
9d98bf4
Renamed read_some to read_packet
Oct 28, 2015
6ced4a9
Renamed async_request to async_request_block
Oct 28, 2015
f05a771
Renamed BlockReader::request to request_block
Oct 28, 2015
fcf1585
Moved to file_info
Oct 28, 2015
a3fd975
Made file_info pointers const
Oct 28, 2015
366f488
Refactored DataNodeConnection, etc.
Oct 28, 2015
418799f
Added shared_ptr to DN_Connection
Oct 29, 2015
f043e15
Moved DNConnection into trait
Oct 29, 2015
aea859f
Trimmed whitespace
Oct 29, 2015
55d7b5d
Re-enabled IS tests
Oct 29, 2015
142efab
Cleaned up some tests
Oct 29, 2015
4bc0f44
Working on less templates
Oct 29, 2015
dd16d4f
Compiles!
Oct 29, 2015
2b14efa
Fixed DNconnection signature
Oct 30, 2015
8d143e7
Fixed segfault in ReadData
Nov 2, 2015
b6f5454
Removed BlockReader callback templates
Nov 2, 2015
3b5d712
Removed last templates from BlockReader
Nov 2, 2015
d9b9241
Moved entirely over to BlockReader w/out templates
Nov 2, 2015
5de0bce
Removed unnecessary impls
Nov 2, 2015
d5baa87
Moved DN to its own file
Nov 3, 2015
9ae3112
Moved AsyncReadBlock to blockreader
Nov 3, 2015
f180c99
Renamed RemoteBlockReader to BlockReaderImpl
Nov 3, 2015
ba823b4
Make AsyncStream match asio stream reqs
Nov 3, 2015
43eec6a
Removed stream continuations
Nov 3, 2015
c3320a2
Re-played int change
Nov 3, 2015
5d02748
Made IS.PRead concurrent
Nov 3, 2015
979500a
Moved C lib to talk to FS
Nov 3, 2015
c7aeebe
Renamed InputStream to FileHandle
Nov 3, 2015
ab8573e
Moved InputStream.cc to FileHandle.cc
Nov 3, 2015
802c923
Removed whitespace
Nov 3, 2015
967b35c
Rolled back pom.xml changes
Nov 3, 2015
951602c
Added cancel
Nov 4, 2015
999a368
Move FileHandle into its own module
Nov 4, 2015
7080f8a
Added missing files
Nov 4, 2015
7a0620c
Roll back pom file
Nov 4, 2015
6a4a43a
Some code cleanup
Nov 4, 2015
a6c5911
Cleared whitespace
Nov 4, 2015
4b99eb6
Added docs to blockreader
Nov 4, 2015
5d30337
Fixed virtual method calls in FileSystem
Nov 6, 2015
f1382d1
Retain reader through last callback
Nov 6, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "libhdfspp/options.h"
#include "libhdfspp/status.h"
#include "common/cancelable.h"

#include <functional>
#include <set>
Expand Down Expand Up @@ -54,9 +55,9 @@ class IoService {
};

/**
* Applications opens an InputStream to read files in HDFS.
* Applications opens a FileHandle to read files in HDFS.
**/
class InputStream {
class FileHandle {
public:
/**
* Read data from a specific position. The current implementation
Expand All @@ -71,12 +72,13 @@ class InputStream {
* The handler returns the datanode that serves the block and the number of
* bytes has read.
**/
virtual void
virtual CancelHandle
PositionRead(void *buf, size_t nbyte, uint64_t offset,
const std::set<std::string> &excluded_datanodes,
const std::function<void(const Status &, const std::string &,
size_t)> &handler) = 0;
virtual ~InputStream();
const std::function<void(const Status &, size_t)> &handler) = 0;

virtual size_t PositionRead(void *buf, size_t nbyte, off_t offset) = 0;

virtual ~FileHandle();
};

/**
Expand All @@ -93,15 +95,24 @@ class FileSystem {
New(IoService *io_service, const Options &options, const std::string &server,
const std::string &service,
const std::function<void(const Status &, FileSystem *)> &handler);

/* Synchronous call of New*/
static FileSystem *
New(IoService *io_service, const Options &options, const std::string &server,
const std::string &service);

/**
* Open a file on HDFS. The call issues an RPC to the NameNode to
* gather the locations of all blocks in the file and to return a
* new instance of the @ref InputStream object.
**/
virtual void
Open(const std::string &path,
const std::function<void(const Status &, InputStream *)> &handler) = 0;
virtual ~FileSystem();
const std::function<void(const Status &, FileHandle *)> &handler) = 0;
virtual Status Open(const std::string &path, FileHandle **handle);

virtual ~FileSystem() {};

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ add_subdirectory(fs)
add_subdirectory(reader)
add_subdirectory(rpc)
add_subdirectory(proto)
add_subdirectory(connection)
add_subdirectory(bindings)
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.


add_library(bindings_c hdfs.cc hdfs_cpp.cc)
add_library(bindings_c hdfs.cc)
add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common)
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@
* limitations under the License.
*/

#include "hdfs_cpp.h"
#include "fs/filesystem.h"

#include <hdfs/hdfs.h>
#include <string>
#include <cstring>
#include <iostream>

using namespace hdfs;

/* Seperate the handles used by the C api from the C++ API*/
struct hdfs_internal {
hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {}
hdfs_internal(std::unique_ptr<HadoopFileSystem> p)
hdfs_internal(FileSystem *p) : filesystem_(p) {}
hdfs_internal(std::unique_ptr<FileSystem> p)
: filesystem_(std::move(p)) {}
virtual ~hdfs_internal(){};
HadoopFileSystem *get_impl() { return filesystem_.get(); }
const HadoopFileSystem *get_impl() const { return filesystem_.get(); }
FileSystem *get_impl() { return filesystem_.get(); }
const FileSystem *get_impl() const { return filesystem_.get(); }

private:
std::unique_ptr<HadoopFileSystem> filesystem_;
std::unique_ptr<FileSystem> filesystem_;
};

struct hdfsFile_internal {
Expand Down Expand Up @@ -65,17 +67,17 @@ static void ReportError(int errnum, std::string msg) {
int hdfsFileIsOpenForRead(hdfsFile file) {
/* files can only be open for reads at the moment, do a quick check */
if (file) {
return file->get_impl()->IsOpenForRead();
return true; // Update implementation when we get file writing
}
return false;
}

hdfsFS hdfsConnect(const char *nn, tPort port) {
HadoopFileSystem *fs = new HadoopFileSystem();
Status stat = fs->Connect(nn, port);
if (!stat.ok()) {
std::string port_as_string = std::to_string(port);
IoService * io_service = IoService::New();
FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
if (!fs) {
ReportError(ENODEV, "Unable to connect to NameNode.");
delete fs;
return nullptr;
}
return new hdfs_internal(fs);
Expand All @@ -102,7 +104,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
return nullptr;
}
FileHandle *f = nullptr;
Status stat = fs->get_impl()->OpenFileForRead(path, &f);
Status stat = fs->get_impl()->Open(path, &f);
if (!stat.ok()) {
return nullptr;
}
Expand Down Expand Up @@ -133,5 +135,5 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
return -1;
}

return file->get_impl()->Pread(buffer, length, position);
return file->get_impl()->PositionRead(buffer, length, position);
}

This file was deleted.

This file was deleted.

Loading