diff --git a/Flow.net/IO/DeleteFiles.cs b/Flow.net/IO/DeleteFiles.cs deleted file mode 100644 index de2e2dc..0000000 --- a/Flow.net/IO/DeleteFiles.cs +++ /dev/null @@ -1,34 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; - -namespace Flow.IO -{ - public class DeleteFiles : PipelineAction - { - public DeleteFiles() - { - SetTypeHandler>(async (context, input) => await Handler(this, context, input)); - SetTypeHandler(async (context, input) => await Handler(this, context, input)); - SetTypeHandler(async (context, input) => await Handler(this, context, input)); - } - - private static async Task Handler(DeleteFiles that, IExecutionContext context, PayloadCollection paths) - { return await Handler(that, context, paths.Cast().ToArray()); } - - private static async Task Handler(DeleteFiles that, IExecutionContext context, params FilePath[] paths) - { return await Handler(that, context, paths.Select(p => p.Path).ToArray()); } - - private static async Task Handler(DeleteFiles that, IExecutionContext context, IEnumerable paths) - { - return await Task.Run( - () => - { - Parallel.ForEach(paths, file => File.Delete(file)); - return NullResult.Instance; - }); - } - } -} diff --git a/Flow.net/IO/FileActions.cs b/Flow.net/IO/FileActions.cs new file mode 100644 index 0000000..2ddd5aa --- /dev/null +++ b/Flow.net/IO/FileActions.cs @@ -0,0 +1,120 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using System.ComponentModel; +using System.Collections.Concurrent; + +namespace Flow.IO +{ + + + public class GetFiles : PipelineAction + { + public string DirectoryPath { get; set; } + public string SearchPattern { get; set; } = "*"; + public SearchOption SearchOption { get; set; } = SearchOption.TopDirectoryOnly; + + protected override async Task DefaultHandlerAsync(IExecutionContext context, IPayload input) + { + var directory = Format(DirectoryPath, context, NullResult.Instance, this); + var searchPattern = Format(SearchPattern, context, NullResult.Instance, this); + var files = Directory.GetFiles(directory, searchPattern, SearchOption); + return await Task.FromResult(new FilePathCollection(files)); + } + } + + public class DeleteFiles : PipelineAction + { + public DeleteFiles() + { + SetTypeHandler>(async (context, input) => await Handler(this, context, input)); + SetTypeHandler(async (context, input) => await Handler(this, context, input)); + SetTypeHandler(async (context, input) => await Handler(this, context, input)); + } + + private static async Task Handler(DeleteFiles that, IExecutionContext context, PayloadCollection paths) + { return await Handler(that, context, paths.Cast().ToArray()); } + + private static async Task Handler(DeleteFiles that, IExecutionContext context, params FilePath[] paths) + { return await Handler(that, context, paths.Select(p => p.Path).ToArray()); } + + private static async Task Handler(DeleteFiles that, IExecutionContext context, IEnumerable paths) + { + return await Task.Run( + () => + { + Parallel.ForEach(paths, file => File.Delete(file)); + return NullResult.Instance; + }); + } + } + + public class CopyFiles : PipelineAction + { + public string DirectoryPath { get; set; } + public CopyFiles() + { + SetTypeHandler>(async (context, input) => await Handler(this, context, input)); + SetTypeHandler(async (context, input) => await Handler(this, context, input)); + SetTypeHandler(async (context, input) => await Handler(this, context, input)); + } + + private static async Task Handler(CopyFiles that, IExecutionContext context, PayloadCollection paths) + { return await Handler(that, context, paths.Cast().ToArray()); } + + private static async Task Handler(CopyFiles that, IExecutionContext context, params FilePath[] paths) + { return await Handler(that, context, paths.Select(p => p.Path).ToArray()); } + + private static async Task Handler(CopyFiles that, IExecutionContext context, IEnumerable paths) + { + return await Task.Run( + () => + { + ConcurrentQueue newPaths = new ConcurrentQueue(); + Parallel.ForEach(paths, file => { + string newFile = Path.Combine(that.DirectoryPath, Path.GetFileName(file)); + File.Copy(file, newFile); + newPaths.Enqueue(newFile); + }); + return new FilePathCollection(newPaths); + }); + } + } + + public class MoveFiles: PipelineAction + { + public string DirectoryPath { get; set; } + public MoveFiles() + { + // Why use value collection and file path? + // benefits of having a FilePathCollection? only I can think of is allowing stronger config valdiations, would only be useful for a flow.gui interface, but that's a worthy cause + SetTypeHandler>(async (context, input) => await Handler(this, context, input)); + SetTypeHandler(async (context, input) => await Handler(this, context, input)); + SetTypeHandler(async (context, input) => await Handler(this, context, input)); + } + + private static async Task Handler(MoveFiles that, IExecutionContext context, PayloadCollection paths) + { return await Handler(that, context, paths.Cast().ToArray()); } + + private static async Task Handler(MoveFiles that, IExecutionContext context, params FilePath[] paths) + { return await Handler(that, context, paths.Select(p => p.Path).ToArray()); } + + private static async Task Handler(MoveFiles that, IExecutionContext context, IEnumerable paths) + { + return await Task.Run( + () => + { + ConcurrentQueue newPaths = new ConcurrentQueue(); + Parallel.ForEach(paths, file => { + string newFile = Path.Combine(that.DirectoryPath, Path.GetFileName(file)); + File.Move(file, newFile); + newPaths.Enqueue(newFile); + }); + return new FilePathCollection(newPaths); + }); + } + } + +} diff --git a/Flow.net/IO/GetFiles.cs b/Flow.net/IO/GetFiles.cs deleted file mode 100644 index 30779eb..0000000 --- a/Flow.net/IO/GetFiles.cs +++ /dev/null @@ -1,23 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading.Tasks; - -namespace Flow.IO -{ - public class GetFiles : PipelineAction - { - public string DirectoryPath { get; set; } - public string SearchPattern { get; set; } = "*"; - public SearchOption SearchOption { get; set; } = SearchOption.TopDirectoryOnly; - - protected override async Task DefaultHandlerAsync(IExecutionContext context, IPayload input) - { - var directory = Format(DirectoryPath, context, NullResult.Instance, this); - var searchPattern = Format(SearchPattern, context, NullResult.Instance, this); - var files = Directory.GetFiles(directory, searchPattern, SearchOption); - return await Task.FromResult(new FilePathCollection(files)); - } - } -} diff --git a/Flow.net/IO/MoveFiles.cs b/Flow.net/IO/MoveFiles.cs new file mode 100644 index 0000000..e0b8d59 --- /dev/null +++ b/Flow.net/IO/MoveFiles.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using System.ComponentModel; +using System.Collections.Concurrent; + +namespace Flow.IO +{ + //public class MoveFiles : PipelineAction + //{ + // public string Directory { get; set; } + // public MoveFiles() + // { + // // Why use value collection and file path? + // // benefits of having a FilePathCollection? only I can think of is allowing stronger config valdiations, would only be useful for a flow.gui interface, but that's a worthy cause + // SetTypeHandler>(async (context, input) => await Handler(this, context, input)); + // SetTypeHandler(async (context, input) => await Handler(this, context, input)); + // SetTypeHandler(async (context, input) => await Handler(this, context, input)); + // } + + // private static async Task Handler(MoveFiles that, IExecutionContext context, PayloadCollection paths) + // { return await Handler(that, context, paths.Cast().ToArray()); } + + // private static async Task Handler(MoveFiles that, IExecutionContext context, params FilePath[] paths) + // { return await Handler(that, context, paths.Select(p => p.Path).ToArray()); } + + // private static async Task Handler(MoveFiles that, IExecutionContext context, IEnumerable paths) + // { + // return await Task.Run( + // () => + // { + // ConcurrentQueue newPaths = new ConcurrentQueue(); + // Parallel.ForEach(paths, file => { + // string newFile = Path.Combine(that.Directory, Path.GetFileNameWithoutExtension(file)); + // File.Move(file, newFile); + // newPaths.Enqueue(newFile); + // }); + // return new FilePathCollection(newPaths); + // }); + // } + //} +} diff --git a/Flow.net/Logging/TraceLogger.cs b/Flow.net/Logging/TraceLogger.cs new file mode 100644 index 0000000..836dee8 --- /dev/null +++ b/Flow.net/Logging/TraceLogger.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Flow; +using Flow.Logging; + +namespace FlowTest +{ + + public class TraceLogger : ILogger + { + public async Task LogErrorAsync(string message) + => await Task.Run(() => Trace.TraceError(message)); + + public async Task LogInfoAsync(string message) + => await Task.Run(() => Trace.TraceInformation(message)); + + public async Task LogWarningAsync(string message) + => await Task.Run(() => Trace.TraceWarning(message)); + } +} diff --git a/Flow.net/PipelineAction.cs b/Flow.net/PipelineAction.cs index 7c5ba02..70412aa 100644 --- a/Flow.net/PipelineAction.cs +++ b/Flow.net/PipelineAction.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.ComponentModel; using System.Data; using System.Linq; using System.Text.Json.Serialization; @@ -15,6 +16,14 @@ private readonly IDictionary ExecuteAsync(IExecutionContext context) await context.LogInfoAsync($"{Name}:{Id} executing"); var result = await GetFormatter(type)(context, input); await context.LogInfoAsync($"{Name}:{Id} compleated"); - return result; + return await SetActionAccessibilityAsync(context, result); + //return result; } catch (Exception ex) { @@ -84,5 +94,18 @@ private static SmartFormat.SmartFormatter CreateDefaultFormater() formatter.Settings.ConvertCharacterStringLiterals = false; return formatter; } + + + + private async Task SetActionAccessibilityAsync(IExecutionContext context, IPayload result) + { + if (!(ScopedName is null)) + context.Scope[ScopedName] = result; + + if (ContextLogged) + await context.LogInfoAsync(context.Serialize(true)); + + return result; + } } } diff --git a/FlowTest/PipelineTest.cs b/FlowTest/PipelineTest.cs index fe22d76..eb55d1e 100644 --- a/FlowTest/PipelineTest.cs +++ b/FlowTest/PipelineTest.cs @@ -5,25 +5,33 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using Newtonsoft.Json; using System; +using System.IO; using System.Diagnostics; using System.Threading.Tasks; namespace FlowTest { + public static class Config + { + public static string ProjectDir => Directory.GetParent(Environment.CurrentDirectory).Parent.Parent.FullName; + public static string TestInputDir => Path.Combine(new string[] { ProjectDir, "TestMaterial", "Input" }); + public static string TestTargetDir => Path.Combine(new string[] { ProjectDir, "TestMaterial", "Output" }); + } + + [TestClass] public class PipelineTest { protected static readonly ILogger Logger = new TraceLogger(); [TestMethod] - public async Task RunTest() + public async Task FileScopingTest() { - new SqlBulkLoadCsv() { }; var builder = new PipelineBuilder(Logger); var result = await builder.StartWith(op => { - op.SearchPattern = "*.cs"; - op.DirectoryPath = @"C:\Projects\Flow.net - GitHub"; + op.DirectoryPath = Config.TestInputDir; + op.SearchPattern = "*.csv"; op.SearchOption = System.IO.SearchOption.AllDirectories; }) .ContinueWith(op => @@ -37,20 +45,199 @@ public async Task RunTest() op.Actions = builder.StartWith().Create().Actions; }) .ExecuteAsync(); + + Trace.WriteLine(JsonConvert.SerializeObject(result, Formatting.Indented)); + Assert.IsTrue(result is PayloadCollection); + } + + + + [TestMethod] + public async Task CopyFilesTest() + { + // Confirm files exist in test input dir + string testFileIn1 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_1.csv" }); + string testFileIn2 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_2.csv" }); + + Assert.IsTrue(File.Exists(testFileIn1)); + Assert.IsTrue(File.Exists(testFileIn2)); + + // Confirm files don't exist in test output dir + string testFileOut1 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_1.csv" }); + string testFileOut2 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_2.csv" }); + + if (File.Exists(testFileOut1)) { File.Delete(testFileOut1); } + if (File.Exists(testFileOut2)) { File.Delete(testFileOut2); } + + Assert.IsFalse(File.Exists(testFileOut1)); + Assert.IsFalse(File.Exists(testFileOut2)); + + + // Test Copy Files + var builder = new PipelineBuilder(Logger); + var result = await builder.StartWith(op => + { + op.DirectoryPath = Config.TestInputDir; + op.SearchPattern = "*.csv"; + op.SearchOption = System.IO.SearchOption.AllDirectories; + }) + .ContinueWith(op => + { + op.DirectoryPath = Config.TestTargetDir; + }) + .ExecuteAsync(); + + Trace.WriteLine(JsonConvert.SerializeObject(result, Formatting.Indented)); + Assert.IsTrue(result is PayloadCollection); + + + // Confirm files now exist in test output dir + Assert.IsTrue(File.Exists(testFileOut1)); + Assert.IsTrue(File.Exists(testFileOut2)); + + } + + [TestMethod] + public async Task MoveFilesTest() + { + + + // Confirm files exist in test input dir + string testFileIn1 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_1.csv" }); + string testFileIn2 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_2.csv" }); + + Assert.IsTrue(File.Exists(testFileIn1)); + Assert.IsTrue(File.Exists(testFileIn2)); + + // Confirm files don't exist in test output dir + string testFileOut1 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_1.csv" }); + string testFileOut2 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_2.csv" }); + + if (File.Exists(testFileOut1)) { File.Delete(testFileOut1); } + if (File.Exists(testFileOut2)) { File.Delete(testFileOut2); } + + Assert.IsFalse(File.Exists(testFileOut1)); + Assert.IsFalse(File.Exists(testFileOut2)); + + + // Test Move Files + var builder = new PipelineBuilder(Logger); + var result = await builder.StartWith(op => + { + op.DirectoryPath = Config.TestInputDir; + op.SearchPattern = "*.csv"; + op.SearchOption = System.IO.SearchOption.AllDirectories; + }) + .ContinueWith(op => + { + op.DirectoryPath = Config.TestTargetDir; + }) + .ExecuteAsync(); + Trace.WriteLine(JsonConvert.SerializeObject(result, Formatting.Indented)); Assert.IsTrue(result is PayloadCollection); + + + // Confirm files now exist in test output dir + Assert.IsTrue(File.Exists(testFileOut1)); + Assert.IsTrue(File.Exists(testFileOut2)); + + // Confirm files now don't exist in test input dir + Assert.IsFalse(File.Exists(testFileIn1)); + Assert.IsFalse(File.Exists(testFileIn2)); + + + // Move files back + var builderClean = new PipelineBuilder(Logger); + var resultClean = await builder.StartWith(op => + { + op.DirectoryPath = Config.TestTargetDir; + op.SearchPattern = "*.csv"; + op.SearchOption = System.IO.SearchOption.AllDirectories; + }) + .ContinueWith(op => + { + op.DirectoryPath = Config.TestInputDir; + }) + .ExecuteAsync(); + + + // Confirm files now exist in test input dir + Assert.IsTrue(File.Exists(testFileIn1)); + Assert.IsTrue(File.Exists(testFileIn2)); + + // Confirm files now don't exist in test output dir + Assert.IsFalse(File.Exists(testFileOut1)); + Assert.IsFalse(File.Exists(testFileOut2)); + } - class TraceLogger : ILogger + + + [TestMethod] + public async Task DeleteFilesTest() { - public async Task LogErrorAsync(string message) - => await Task.Run(() => Trace.TraceError(message)); - public async Task LogInfoAsync(string message) - => await Task.Run(() => Trace.TraceInformation(message)); - public async Task LogWarningAsync(string message) - => await Task.Run(() => Trace.TraceWarning(message)); + // Confirm files exist in test input dir + string testFileIn1 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_1.csv" }); + string testFileIn2 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_2.csv" }); + + Assert.IsTrue(File.Exists(testFileIn1)); + Assert.IsTrue(File.Exists(testFileIn2)); + + // Confirm files don't exist in test output dir + string testFileOut1 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_1.csv" }); + string testFileOut2 = Path.Combine(new string[] { Config.TestInputDir, "sample_data_2.csv" }); + + if (File.Exists(testFileOut1)) { File.Delete(testFileOut1); } + if (File.Exists(testFileOut2)) { File.Delete(testFileOut2); } + + Assert.IsFalse(File.Exists(testFileOut1)); + Assert.IsFalse(File.Exists(testFileOut2)); + + + // Copy Files + var builder = new PipelineBuilder(Logger); + var copyFilesForDelete = await builder.StartWith(op => + { + op.DirectoryPath = Config.TestInputDir; + op.SearchPattern = "*.csv"; + op.SearchOption = System.IO.SearchOption.AllDirectories; + }) + .ContinueWith(op => + { + op.DirectoryPath = Config.TestTargetDir; + }) + .ExecuteAsync(); + + // Confirm files to delete exist + Assert.IsTrue(File.Exists(testFileOut1)); + Assert.IsTrue(File.Exists(testFileOut2)); + + + // Test Delete Files + var result = await builder.StartWith(op => + { + op.DirectoryPath = Config.TestInputDir; + op.SearchPattern = "*.csv"; + op.SearchOption = System.IO.SearchOption.AllDirectories; + }) + .ContinueWith() + .ExecuteAsync(); + + Trace.WriteLine(JsonConvert.SerializeObject(result, Formatting.Indented)); + Assert.IsTrue(result is NullResult); + + + // Confirm files were deleted + Assert.IsFalse(File.Exists(testFileOut1)); + Assert.IsFalse(File.Exists(testFileOut2)); + + + } + + } } \ No newline at end of file diff --git a/FlowTest/TestMaterial/Input/sample_data.csv b/FlowTest/TestMaterial/Input/sample_data.csv new file mode 100644 index 0000000..8c256dc --- /dev/null +++ b/FlowTest/TestMaterial/Input/sample_data.csv @@ -0,0 +1,4 @@ +id,name,account,mystery +1,van,19.89, +2,man,1.00000001,12 +9999999,jan,52.89,ABCDEFGHIJKLMNOP diff --git a/FlowTest/TestMaterial/Input/sample_data_2.csv b/FlowTest/TestMaterial/Input/sample_data_2.csv new file mode 100644 index 0000000..bcfa637 --- /dev/null +++ b/FlowTest/TestMaterial/Input/sample_data_2.csv @@ -0,0 +1,4 @@ +id,name,account,mystery,size +1,van,19.89,1.2 +2,man,1.00000001,12,NaN +9999999,jan,52.89,ABCDEFGHIJKLMNOP,NaN \ No newline at end of file diff --git a/FlowTest/TestMaterial/Output/x.csv b/FlowTest/TestMaterial/Output/x.csv new file mode 100644 index 0000000..bcfa637 --- /dev/null +++ b/FlowTest/TestMaterial/Output/x.csv @@ -0,0 +1,4 @@ +id,name,account,mystery,size +1,van,19.89,1.2 +2,man,1.00000001,12,NaN +9999999,jan,52.89,ABCDEFGHIJKLMNOP,NaN \ No newline at end of file