From 4f315531946272909eff81d32d4883ea2f1733f3 Mon Sep 17 00:00:00 2001 From: Weiwei Chen Date: Thu, 3 Jul 2014 18:12:03 -0700 Subject: [PATCH] last commit before submission of fgcs --- .../examples/WorkflowSimBasicExample2.java | 316 +++++++++++++ .../balancing/BalancedClusteringExample1.java | 27 +- .../balancing/BalancedClusteringExample2.java | 421 ++++++++++++++++++ .../balancing/BalancedClusteringExample3.java | 415 +++++++++++++++++ .../FaultTolerantClusteringExample3.java | 2 +- sources/org/workflowsim/ClusteringEngine.java | 15 + .../org/workflowsim/DatacenterExtended.java | 2 +- sources/org/workflowsim/WorkflowPlanner.java | 10 +- .../clustering/AFJSClustering.java | 149 +++++++ .../clustering/DFJSClustering.java | 131 ++++++ .../balancing/BalancedClustering.java | 61 +-- .../balancing/methods/BalancingMethod.java | 25 +- .../methods/HorizontalDistanceBalancing.java | 57 ++- .../methods/HorizontalImpactBalancing.java | 116 +++-- .../balancing/methods/VerticalBalancing.java | 2 +- .../DataAwareSchedulingAlgorithm.java | 6 +- .../utils/ClusteringParameters.java | 50 ++- 17 files changed, 1679 insertions(+), 126 deletions(-) create mode 100644 examples/org/workflowsim/examples/WorkflowSimBasicExample2.java create mode 100644 examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample2.java create mode 100644 examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample3.java create mode 100644 sources/org/workflowsim/clustering/AFJSClustering.java create mode 100644 sources/org/workflowsim/clustering/DFJSClustering.java diff --git a/examples/org/workflowsim/examples/WorkflowSimBasicExample2.java b/examples/org/workflowsim/examples/WorkflowSimBasicExample2.java new file mode 100644 index 00000000..1a3ee367 --- /dev/null +++ b/examples/org/workflowsim/examples/WorkflowSimBasicExample2.java @@ -0,0 +1,316 @@ +/** + * Copyright 2012-2013 University Of Southern California + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.workflowsim.examples; + +import java.io.File; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.LinkedList; +import java.util.List; +import org.cloudbus.cloudsim.Cloudlet; +import org.cloudbus.cloudsim.CloudletSchedulerSpaceShared; +import org.cloudbus.cloudsim.DatacenterCharacteristics; +import org.cloudbus.cloudsim.Host; +import org.cloudbus.cloudsim.Log; +import org.cloudbus.cloudsim.Pe; +import org.cloudbus.cloudsim.Storage; +import org.cloudbus.cloudsim.VmAllocationPolicySimple; +import org.cloudbus.cloudsim.VmSchedulerTimeShared; +import org.cloudbus.cloudsim.core.CloudSim; +import org.cloudbus.cloudsim.provisioners.BwProvisionerSimple; +import org.cloudbus.cloudsim.provisioners.PeProvisionerSimple; +import org.cloudbus.cloudsim.provisioners.RamProvisionerSimple; +import org.workflowsim.ClusterStorage; +import org.workflowsim.CondorVM; +import org.workflowsim.DatacenterExtended; +import org.workflowsim.Job; +import org.workflowsim.WorkflowEngine; +import org.workflowsim.WorkflowPlanner; +import org.workflowsim.failure.FailureGenerator; +import org.workflowsim.failure.FailureMonitor; +import org.workflowsim.utils.ClusteringParameters; +import org.workflowsim.utils.OverheadParameters; +import org.workflowsim.utils.Parameters; +import org.workflowsim.utils.ReplicaCatalog; + +/** + * This WorkflowSimExample creates a workflow planner, a workflow engine, and + * one schedulers, one data centers and 20 vms. You should change daxPath at least. + * You may change other parameters as well. + * + * @author Weiwei Chen + * @since WorkflowSim Toolkit 1.0 + * @date Apr 9, 2013 + */ +public class WorkflowSimBasicExample2 { + + private static List createVM(int userId, int vms) { + + //Creates a container to store VMs. This list is passed to the broker later + LinkedList list = new LinkedList(); + + //VM Parameters + long size = 10000; //image size (MB) + int ram = 512; //vm memory (MB) + int mips = 1000; + long bw = 1000; + int pesNumber = 1; //number of cpus + String vmm = "Xen"; //VMM name + + //create VMs + CondorVM[] vm = new CondorVM[vms]; + + for (int i = 0; i < vms; i++) { + double ratio = 1.0; + vm[i] = new CondorVM(i, userId, mips * ratio, pesNumber, ram, bw, size, vmm, new CloudletSchedulerSpaceShared()); + list.add(vm[i]); + } + + return list; + } + + ////////////////////////// STATIC METHODS /////////////////////// + /** + * Creates main() to run this example + * This example has only one datacenter and one storage + */ + public static void main(String[] args) { + + + try { + // First step: Initialize the WorkflowSim package. + + /** + * However, the exact number of vms may not necessarily be vmNum If + * the data center or the host doesn't have sufficient resources the + * exact vmNum would be smaller than that. Take care. + */ + int vmNum = 20;//number of vms; + /** + * Should change this based on real physical path + */ + String daxPath = "/Users/chenweiwei/Work/WorkflowSim-1.0/config/dax/Montage_100.xml"; + if(daxPath == null){ + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + File daxFile = new File(daxPath); + if(!daxFile.exists()){ + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + /* + * Use default Fault Tolerant Parameters + */ + Parameters.FTCMonitor ftc_monitor = Parameters.FTCMonitor.MONITOR_NONE; + Parameters.FTCFailure ftc_failure = Parameters.FTCFailure.FAILURE_NONE; + Parameters.FTCluteringAlgorithm ftc_method = null; + + /** + * Since we are using MINMIN scheduling algorithm, the planning algorithm should be INVALID + * such that the planner would not override the result of the scheduler + */ + Parameters.SchedulingAlgorithm sch_method = Parameters.SchedulingAlgorithm.MINMIN; + Parameters.PlanningAlgorithm pln_method = Parameters.PlanningAlgorithm.INVALID; + ReplicaCatalog.FileSystem file_system = ReplicaCatalog.FileSystem.SHARED; + + /** + * No overheads + */ + OverheadParameters op = new OverheadParameters(0, null, null, null, null, 0);; + + /** + * No Clustering + */ + ClusteringParameters.ClusteringMethod method = ClusteringParameters.ClusteringMethod.NONE; + ClusteringParameters cp = new ClusteringParameters(0, 0, method, null); + + /** + * Initialize static parameters + */ + Parameters.init(ftc_method, ftc_monitor, ftc_failure, + null, vmNum, daxPath, null, + null, op, cp, sch_method, pln_method, + null, 0); + ReplicaCatalog.init(file_system); + + FailureMonitor.init(); + FailureGenerator.init(); + + // before creating any entities. + int num_user = 1; // number of grid users + Calendar calendar = Calendar.getInstance(); + boolean trace_flag = false; // mean trace events + + // Initialize the CloudSim library + CloudSim.init(num_user, calendar, trace_flag); + + DatacenterExtended datacenter0 = createDatacenter("Datacenter_0"); + + /** + * Create a WorkflowPlanner with one schedulers. + */ + WorkflowPlanner wfPlanner = new WorkflowPlanner("planner_0", 1); + /** + * Create a WorkflowEngine. + */ + WorkflowEngine wfEngine = wfPlanner.getWorkflowEngine(); + /** + * Create a list of VMs.The userId of a vm is basically the id of the scheduler + * that controls this vm. + */ + List vmlist0 = createVM(wfEngine.getSchedulerId(0), Parameters.getVmNum()); + + /** + * Submits this list of vms to this WorkflowEngine. + */ + wfEngine.submitVmList(vmlist0, 0); + + /** + * Binds the data centers with the scheduler. + */ + wfEngine.bindSchedulerDatacenter(datacenter0.getId(), 0); + + CloudSim.startSimulation(); + + + List outputList0 = wfEngine.getJobsReceivedList(); + + CloudSim.stopSimulation(); + + printJobList(outputList0); + + + } catch (Exception e) { + Log.printLine("The simulation has been terminated due to an unexpected error"); + } + } + + private static DatacenterExtended createDatacenter(String name) { + + // Here are the steps needed to create a PowerDatacenter: + // 1. We need to create a list to store one or more + // Machines + List hostList = new ArrayList(); + + // 2. A Machine contains one or more PEs or CPUs/Cores. Therefore, should + // create a list to store these PEs before creating + // a Machine. + for (int i = 1; i <= 20; i++) { + List peList1 = new ArrayList(); + int mips = 2000; + // 3. Create PEs and add these into the list. + //for a quad-core machine, a list of 4 PEs is required: + peList1.add(new Pe(0, new PeProvisionerSimple(mips))); // need to store Pe id and MIPS Rating + peList1.add(new Pe(1, new PeProvisionerSimple(mips))); + + int hostId = 0; + int ram = 2048; //host memory (MB) + long storage = 1000000; //host storage + int bw = 10000; + hostList.add( + new Host( + hostId, + new RamProvisionerSimple(ram), + new BwProvisionerSimple(bw), + storage, + peList1, + new VmSchedulerTimeShared(peList1))); // This is our first machine + hostId++; + + } + + // 5. Create a DatacenterCharacteristics object that stores the + // properties of a data center: architecture, OS, list of + // Machines, allocation policy: time- or space-shared, time zone + // and its price (G$/Pe time unit). + String arch = "x86"; // system architecture + String os = "Linux"; // operating system + String vmm = "Xen"; + double time_zone = 10.0; // time zone this resource located + double cost = 3.0; // the cost of using processing in this resource + double costPerMem = 0.05; // the cost of using memory in this resource + double costPerStorage = 0.1; // the cost of using storage in this resource + double costPerBw = 0.1; // the cost of using bw in this resource + LinkedList storageList = new LinkedList(); //we are not adding SAN devices by now + DatacenterExtended datacenter = null; + + + DatacenterCharacteristics characteristics = new DatacenterCharacteristics( + arch, os, vmm, hostList, time_zone, cost, costPerMem, costPerStorage, costPerBw); + + + // 6. Finally, we need to create a cluster storage object. + /** + * The bandwidth within a data center. + */ + double intraBandwidth = 1.5e7;// the number comes from the futuregrid site, you can specify your bw + + try { + ClusterStorage s1 = new ClusterStorage(name, 1e12); + + // The bandwidth within a data center + s1.setBandwidth("local", intraBandwidth); + // The bandwidth to the source site + s1.setBandwidth("source", intraBandwidth); + storageList.add(s1); + datacenter = new DatacenterExtended(name, characteristics, new VmAllocationPolicySimple(hostList), storageList, 0); + } catch (Exception e) { + } + + return datacenter; + } + + /** + * Prints the job objects + * + * @param list list of jobs + */ + private static void printJobList(List list) { + int size = list.size(); + Job job; + + String indent = " "; + Log.printLine(); + Log.printLine("========== OUTPUT =========="); + Log.printLine("Cloudlet ID" + indent + "STATUS" + indent + + "Data center ID" + indent + "VM ID" + indent + indent + "Time" + indent + "Start Time" + indent + "Finish Time" + indent + "Depth"); + + DecimalFormat dft = new DecimalFormat("###.##"); + for (int i = 0; i < size; i++) { + job = list.get(i); + Log.print(indent + job.getCloudletId() + indent + indent); + + if (job.getCloudletStatus() == Cloudlet.SUCCESS) { + Log.print("SUCCESS"); + + Log.printLine(indent + indent + job.getResourceId() + indent + indent + indent + job.getVmId() + + indent + indent + indent + dft.format(job.getActualCPUTime()) + + indent + indent + dft.format(job.getExecStartTime()) + indent + indent + indent + + dft.format(job.getFinishTime()) + indent + indent + indent + job.getDepth() + indent + job.getProcessingCost()); + } else if (job.getCloudletStatus() == Cloudlet.FAILED) { + Log.print("FAILED"); + + Log.printLine(indent + indent + job.getResourceId() + indent + indent + indent + job.getVmId() + + indent + indent + indent + dft.format(job.getActualCPUTime()) + + indent + indent + dft.format(job.getExecStartTime()) + indent + indent + indent + + dft.format(job.getFinishTime()) + indent + indent + indent + job.getDepth()); + } + } + + } +} diff --git a/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample1.java b/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample1.java index b381ffee..6cf021cb 100644 --- a/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample1.java +++ b/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample1.java @@ -98,11 +98,12 @@ public static void main(String[] args) { * delete in the future * */ - String code = "i"; - String daxPath = "/Users/chenweiwei/Research/balanced_clustering/generator/BharathiPaper/Fake_1.xml"; - double intraBandwidth = 1.5e5; - double c_delay = 0, q_delay = 0, e_delay = 0, p_delay = 0; - int interval = 0; + String code = "n"; + String daxPath = "/Users/chenweiwei/NetBeansProjects/BhWorkflowGen/Inspiral_8000.xml"; + double intraBandwidth = 1.5e7; + double c_delay = 0, q_delay = 100, e_delay = 100, p_delay = 0; + int interval = 5; + int vmNum = 200;//number of vms; for(int i = 0; i < args.length; i ++){ char key = args[i].charAt(1); @@ -131,6 +132,9 @@ public static void main(String[] args) { case 'i': interval = Integer.parseInt(args[++i]); break; + case 'v': + vmNum = Integer.parseInt(args[++i]); + break; } } // First step: Initialize the WorkflowSim package. @@ -140,7 +144,7 @@ public static void main(String[] args) { * the data center or the host doesn't have sufficient resources the * exact vmNum would be smaller than that. Take care. */ - int vmNum = 20;//number of vms; + /** * Should change this based on real physical path */ @@ -206,7 +210,7 @@ public static void main(String[] args) { * i: Horizontal Impact Factor Balancing (HIFB) * h: Horizontal Random Balancing , the original horizontal clustering */ - ClusteringParameters cp = new ClusteringParameters(20, 0, method, code); + ClusteringParameters cp = new ClusteringParameters(vmNum, 0, method, code); /** @@ -229,7 +233,7 @@ public static void main(String[] args) { // Initialize the CloudSim library CloudSim.init(num_user, calendar, trace_flag); - DatacenterExtended datacenter0 = createDatacenter("Datacenter_0", intraBandwidth); + DatacenterExtended datacenter0 = createDatacenter("Datacenter_0", intraBandwidth, vmNum); /** * Create a WorkflowPlanner with one schedulers. @@ -270,12 +274,11 @@ public static void main(String[] args) { } } - private static DatacenterExtended createDatacenter(String name , double intraBandwidth) { + private static DatacenterExtended createDatacenter(String name , double intraBandwidth, int vmNumber) { // Here are the steps needed to create a PowerDatacenter: // 1. We need to create a list to store one or more // Machines - int vmNumber = 20; List hostList = new ArrayList(); // 2. A Machine contains one or more PEs or CPUs/Cores. Therefore, should @@ -290,8 +293,8 @@ private static DatacenterExtended createDatacenter(String name , double intraBan peList1.add(new Pe(1, new PeProvisionerSimple(mips))); int hostId = 0; - int ram = 2048; //host memory (MB) - long storage = 1000000; //host storage + int ram = 2048 * vmNumber / 20; //host memory (MB) + long storage = 1000000 * vmNumber / 20; //host storage int bw = 10000; hostList.add( new Host( diff --git a/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample2.java b/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample2.java new file mode 100644 index 00000000..f882113d --- /dev/null +++ b/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample2.java @@ -0,0 +1,421 @@ +/** + * Copyright 2012-2013 University Of Southern California + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.workflowsim.examples.clustering.balancing; + +import java.io.File; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.cloudbus.cloudsim.Cloudlet; +import org.cloudbus.cloudsim.CloudletSchedulerSpaceShared; +import org.cloudbus.cloudsim.DatacenterCharacteristics; +import org.cloudbus.cloudsim.Host; +import org.cloudbus.cloudsim.Log; +import org.cloudbus.cloudsim.Pe; +import org.cloudbus.cloudsim.Storage; +import org.cloudbus.cloudsim.VmAllocationPolicySimple; +import org.cloudbus.cloudsim.VmSchedulerTimeShared; +import org.cloudbus.cloudsim.core.CloudSim; +import org.cloudbus.cloudsim.provisioners.BwProvisionerSimple; +import org.cloudbus.cloudsim.provisioners.PeProvisionerSimple; +import org.cloudbus.cloudsim.provisioners.RamProvisionerSimple; +import org.workflowsim.CondorVM; +import org.workflowsim.DatacenterExtended; +import org.workflowsim.DistributedClusterStorage; +import org.workflowsim.Job; +import org.workflowsim.WorkflowEngine; +import org.workflowsim.WorkflowPlanner; +import org.workflowsim.failure.FailureGenerator; +import org.workflowsim.failure.FailureMonitor; +import org.workflowsim.utils.ClusteringParameters; +import org.workflowsim.utils.OverheadParameters; +import org.workflowsim.utils.Parameters; +import org.workflowsim.utils.ReplicaCatalog; + +/** + * This BalancedClusteringExample2 is using balanced horizontal clustering or + * more specifically using horizontal runtime balancing. + * + * @author Weiwei Chen + * @since WorkflowSim Toolkit 1.0 + * @date Dec 29, 2013 + */ +public class BalancedClusteringExample2 { + + private static List createVM(int userId, int vms) { + + //Creates a container to store VMs. This list is passed to the broker later + LinkedList list = new LinkedList(); + + //VM Parameters + long size = 10000; //image size (MB) + int ram = 512; //vm memory (MB) + int mips = 1000; + long bw = 1000; + int pesNumber = 1; //number of cpus + String vmm = "Xen"; //VMM name + + //create VMs + CondorVM[] vm = new CondorVM[vms]; + + for (int i = 0; i < vms; i++) { + double ratio = 1.0; + vm[i] = new CondorVM(i, userId, mips * ratio, pesNumber, ram, bw, size, vmm, new CloudletSchedulerSpaceShared()); + list.add(vm[i]); + } + + return list; + } + + ////////////////////////// STATIC METHODS /////////////////////// + /** + * Creates main() to run this example This example has only one datacenter + * and one storage + */ + public static void main(String[] args) { + + + try { + + /** + * delete in the future + * + */ + + + /** + * However, the exact number of vms may not necessarily be vmNum If + * the data center or the host doesn't have sufficient resources the + * exact vmNum would be smaller than that. Take care. + */ + int vmNum = 20;//number of vms; + int jobNum = 20; + String code = "r"; + String daxPath = "/Users/chenweiwei/Research/balanced_clustering/data/scan/LIGO.n.700.1.dax"; + double intraBandwidth = 1.5e7; + double c_delay = 0, q_delay = 100, e_delay = 100, p_delay = 0; + int interval = 5; + double communication_graunlarity = 0.0, computation_granularity = 0.0; + for (int i = 0; i < args.length; i++) { + char key = args[i].charAt(1); + switch (key) { + case 'c': + code = args[++i]; + break; + case 'd': + daxPath = args[++i]; + break; + case 'b': + intraBandwidth = Double.parseDouble(args[++i]); + break; + case 'l': + c_delay = Double.parseDouble(args[++i]); + break; + case 'q': + q_delay = Double.parseDouble(args[++i]); + break; + case 'e': + e_delay = Double.parseDouble(args[++i]); + break; + case 'p': + p_delay = Double.parseDouble(args[++i]); + break; + case 'i': + interval = Integer.parseInt(args[++i]); + break; + case 's': + computation_granularity = Double.parseDouble(args[++i]); + break; + case 'm': + communication_graunlarity = Double.parseDouble(args[++i]); + break; + case 'v': + vmNum = Integer.parseInt(args[++i]); + break; + case 'j': + jobNum = Integer.parseInt(args[++i]); + break; + } + } + // First step: Initialize the WorkflowSim package. + + + /** + * Should change this based on real physical path + */ + //String daxPath = "/Users/chenweiwei/Research/balanced_clustering/generator/BharathiPaper/Fake_1.xml"; + if (daxPath == null) { + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + File daxFile = new File(daxPath); + if (!daxFile.exists()) { + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + /* + * Use default Fault Tolerant Parameters + */ + Parameters.FTCMonitor ftc_monitor = Parameters.FTCMonitor.MONITOR_NONE; + Parameters.FTCFailure ftc_failure = Parameters.FTCFailure.FAILURE_NONE; + Parameters.FTCluteringAlgorithm ftc_method = null; + + /** + * Since we are using MINMIN scheduling algorithm, the planning + * algorithm should be INVALID such that the planner would not + * override the result of the scheduler + */ + Parameters.SchedulingAlgorithm sch_method = Parameters.SchedulingAlgorithm.DATA; + Parameters.PlanningAlgorithm pln_method = Parameters.PlanningAlgorithm.INVALID; + ReplicaCatalog.FileSystem file_system = ReplicaCatalog.FileSystem.LOCAL; + + /** + * clustering delay must be added, if you don't need it, you can set + * all the clustering delay to be zero, but not null + */ + Map clusteringDelay = new HashMap(); + Map queueDelay = new HashMap(); + Map postscriptDelay = new HashMap(); + Map engineDelay = new HashMap(); + /** + * application has at most 11 horizontal levels + */ + int maxLevel = 11; + for (int level = 0; level < maxLevel; level++) { + clusteringDelay.put(level, c_delay); + queueDelay.put(level, q_delay); + postscriptDelay.put(level, p_delay); + engineDelay.put(level, e_delay); + } + // Add clustering delay to the overhead parameters + /** + * Map wed_delay, Map queue_delay, + * Map post_delay, Map + * cluster_delay, + */ + OverheadParameters op = new OverheadParameters(interval, engineDelay, queueDelay, postscriptDelay, clusteringDelay, 0);; + + /** + * Balanced Clustering + */ + ClusteringParameters.ClusteringMethod method = null; + if (code.equalsIgnoreCase("m")) { + method = ClusteringParameters.ClusteringMethod.DFJS; + } else if (code.equalsIgnoreCase("a")) { + method = ClusteringParameters.ClusteringMethod.AFJS; + } else if (code.equalsIgnoreCase("n")){ + method = ClusteringParameters.ClusteringMethod.NONE; + } + else { + method = ClusteringParameters.ClusteringMethod.BALANCED; + } + /** + * r: Horizontal Runtime Balancing (HRB) d: Horizontal Distance + * Balancing (HDB) i: Horizontal Impact Factor Balancing (HIFB) h: + * Horizontal Random Balancing , the original horizontal clustering + */ + ClusteringParameters cp = new ClusteringParameters(jobNum, 0, method, code); + if (communication_graunlarity != 0.0) { + cp.setGranularityDataSize(communication_graunlarity); + } + if (computation_granularity != 0.0) { + cp.setGranularityTimeSize(computation_granularity); + } + + /** + * Initialize static parameters + */ + Parameters.init(ftc_method, ftc_monitor, ftc_failure, + null, vmNum, daxPath, null, + null, op, cp, sch_method, pln_method, + null, 0); + ReplicaCatalog.init(file_system); + + FailureMonitor.init(); + FailureGenerator.init(); + + // before creating any entities. + int num_user = 1; // number of grid users + Calendar calendar = Calendar.getInstance(); + boolean trace_flag = false; // mean trace events + + // Initialize the CloudSim library + CloudSim.init(num_user, calendar, trace_flag); + + DatacenterExtended datacenter0 = createDatacenter("Datacenter_0", intraBandwidth, vmNum); + + /** + * Create a WorkflowPlanner with one schedulers. + */ + WorkflowPlanner wfPlanner = new WorkflowPlanner("planner_0", 1); + /** + * Create a WorkflowEngine. + */ + WorkflowEngine wfEngine = wfPlanner.getWorkflowEngine(); + /** + * Create a list of VMs.The userId of a vm is basically the id of + * the scheduler that controls this vm. + */ + List vmlist0 = createVM(wfEngine.getSchedulerId(0), Parameters.getVmNum()); + + /** + * Submits this list of vms to this WorkflowEngine. + */ + wfEngine.submitVmList(vmlist0, 0); + + /** + * Binds the data centers with the scheduler. + */ + wfEngine.bindSchedulerDatacenter(datacenter0.getId(), 0); + + CloudSim.startSimulation(); + + + List outputList0 = wfEngine.getJobsReceivedList(); + + CloudSim.stopSimulation(); + + printJobList(outputList0); + + + } catch (Exception e) { + Log.printLine("The simulation has been terminated due to an unexpected error"); + } + } + + private static DatacenterExtended createDatacenter(String name, double intraBandwidth , int vmNumber) { + + // Here are the steps needed to create a PowerDatacenter: + // 1. We need to create a list to store one or more + // Machines + //int vmNumber = 20; + List hostList = new ArrayList(); + + // 2. A Machine contains one or more PEs or CPUs/Cores. Therefore, should + // create a list to store these PEs before creating + // a Machine. + for (int i = 1; i <= vmNumber; i++) { + List peList1 = new ArrayList(); + int mips = 2000; + // 3. Create PEs and add these into the list. + //for a quad-core machine, a list of 4 PEs is required: + peList1.add(new Pe(0, new PeProvisionerSimple(mips))); // need to store Pe id and MIPS Rating + peList1.add(new Pe(1, new PeProvisionerSimple(mips))); + + int hostId = 0; + int ram = 2048; //host memory (MB) + long storage = 1000000; //host storage + int bw = 10000; + hostList.add( + new Host( + hostId, + new RamProvisionerSimple(ram), + new BwProvisionerSimple(bw), + storage, + peList1, + new VmSchedulerTimeShared(peList1))); // This is our first machine + hostId++; + + } + + // 5. Create a DatacenterCharacteristics object that stores the + // properties of a data center: architecture, OS, list of + // Machines, allocation policy: time- or space-shared, time zone + // and its price (G$/Pe time unit). + String arch = "x86"; // system architecture + String os = "Linux"; // operating system + String vmm = "Xen"; + double time_zone = 10.0; // time zone this resource located + double cost = 3.0; // the cost of using processing in this resource + double costPerMem = 0.05; // the cost of using memory in this resource + double costPerStorage = 0.1; // the cost of using storage in this resource + double costPerBw = 0.1; // the cost of using bw in this resource + LinkedList storageList = new LinkedList(); //we are not adding SAN devices by now + DatacenterExtended datacenter = null; + + + DatacenterCharacteristics characteristics = new DatacenterCharacteristics( + arch, os, vmm, hostList, time_zone, cost, costPerMem, costPerStorage, costPerBw); + + + // 6. Finally, we need to create a cluster storage object. + /** + * The bandwidth within a data center. + */ + //double intraBandwidth = 1.5e3;// the number comes from the futuregrid site, you can specify your bw + try { + DistributedClusterStorage s1 = new DistributedClusterStorage(name, 1e12, vmNumber, intraBandwidth / 2); + + // The bandwidth from one vm to another vm + for (int source = 0; source < vmNumber; source++) { + for (int destination = 0; destination < vmNumber; destination++) { + if (source == destination) { + continue; + } + s1.setBandwidth(source, destination, intraBandwidth); + } + } + storageList.add(s1); + datacenter = new DatacenterExtended(name, characteristics, new VmAllocationPolicySimple(hostList), storageList, 0); + } catch (Exception e) { + } + + return datacenter; + } + + /** + * Prints the job objects + * + * @param list list of jobs + */ + private static void printJobList(List list) { + int size = list.size(); + Job job; + + String indent = " "; + Log.printLine(); + Log.printLine("========== OUTPUT =========="); + Log.printLine("Cloudlet ID" + indent + "STATUS" + indent + + "Data center ID" + indent + "VM ID" + indent + indent + "Time" + indent + "Start Time" + indent + "Finish Time" + indent + "Depth"); + + DecimalFormat dft = new DecimalFormat("###.##"); + for (int i = 0; i < size; i++) { + job = list.get(i); + Log.print(indent + job.getCloudletId() + indent + indent); + + if (job.getCloudletStatus() == Cloudlet.SUCCESS) { + Log.print("SUCCESS"); + + Log.printLine(indent + indent + job.getResourceId() + indent + indent + indent + job.getVmId() + + indent + indent + indent + dft.format(job.getActualCPUTime()) + + indent + indent + dft.format(job.getExecStartTime()) + indent + indent + indent + + dft.format(job.getFinishTime()) + indent + indent + indent + job.getDepth()); + } else if (job.getCloudletStatus() == Cloudlet.FAILED) { + Log.print("FAILED"); + + Log.printLine(indent + indent + job.getResourceId() + indent + indent + indent + job.getVmId() + + indent + indent + indent + dft.format(job.getActualCPUTime()) + + indent + indent + dft.format(job.getExecStartTime()) + indent + indent + indent + + dft.format(job.getFinishTime()) + indent + indent + indent + job.getDepth()); + } + } + + } +} diff --git a/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample3.java b/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample3.java new file mode 100644 index 00000000..31eaa7e8 --- /dev/null +++ b/examples/org/workflowsim/examples/clustering/balancing/BalancedClusteringExample3.java @@ -0,0 +1,415 @@ +/** + * Copyright 2012-2013 University Of Southern California + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.workflowsim.examples.clustering.balancing; + +import java.io.File; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.cloudbus.cloudsim.Cloudlet; +import org.cloudbus.cloudsim.CloudletSchedulerSpaceShared; +import org.cloudbus.cloudsim.DatacenterCharacteristics; +import org.cloudbus.cloudsim.Host; +import org.cloudbus.cloudsim.Log; +import org.cloudbus.cloudsim.Pe; +import org.cloudbus.cloudsim.Storage; +import org.cloudbus.cloudsim.VmAllocationPolicySimple; +import org.cloudbus.cloudsim.VmSchedulerTimeShared; +import org.cloudbus.cloudsim.core.CloudSim; +import org.cloudbus.cloudsim.provisioners.BwProvisionerSimple; +import org.cloudbus.cloudsim.provisioners.PeProvisionerSimple; +import org.cloudbus.cloudsim.provisioners.RamProvisionerSimple; +import org.workflowsim.CondorVM; +import org.workflowsim.DatacenterExtended; +import org.workflowsim.DistributedClusterStorage; +import org.workflowsim.Job; +import org.workflowsim.WorkflowEngine; +import org.workflowsim.WorkflowPlanner; +import org.workflowsim.failure.FailureGenerator; +import org.workflowsim.failure.FailureMonitor; +import org.workflowsim.utils.ClusteringParameters; +import org.workflowsim.utils.OverheadParameters; +import org.workflowsim.utils.Parameters; +import org.workflowsim.utils.ReplicaCatalog; + +/** + * This BalancedClusteringExample3 is using balanced horizontal clustering or + * more specifically using horizontal runtime balancing. + * + * @author Weiwei Chen + * @since WorkflowSim Toolkit 1.0 + * @date Dec 29, 2013 + */ +public class BalancedClusteringExample3 { + + private static List createVM(int userId, int vms) { + + //Creates a container to store VMs. This list is passed to the broker later + LinkedList list = new LinkedList(); + + //VM Parameters + long size = 10000; //image size (MB) + int ram = 512; //vm memory (MB) + int mips = 1000; + long bw = 1000; + int pesNumber = 1; //number of cpus + String vmm = "Xen"; //VMM name + + //create VMs + CondorVM[] vm = new CondorVM[vms]; + + for (int i = 0; i < vms; i++) { + double ratio = 1.0; + vm[i] = new CondorVM(i, userId, mips * ratio, pesNumber, ram, bw, size, vmm, new CloudletSchedulerSpaceShared()); + list.add(vm[i]); + } + + return list; + } + + ////////////////////////// STATIC METHODS /////////////////////// + /** + * Creates main() to run this example This example has only one datacenter + * and one storage + */ + public static void main(String[] args) { + + + try { + + /** + * However, the exact number of vms may not necessarily be vmNum If + * the data center or the host doesn't have sufficient resources the + * exact vmNum would be smaller than that. Take care. + */ + int vmNum = 20;//number of vms; + int jobNum = 20; + String code = "i"; + String daxPath = "/Users/chenweiwei/Research/balanced_clustering/data/scan-1/GENOME.d.702049732.5.dax"; + double intraBandwidth = 1.5e7; + double c_delay = 0, q_delay = 100, e_delay = 100, p_delay = 0; + int interval = 5; + double communication_graunlarity = 0.0, computation_granularity = 0.0; + for (int i = 0; i < args.length; i++) { + char key = args[i].charAt(1); + switch (key) { + case 'c': + code = args[++i]; + break; + case 'd': + daxPath = args[++i]; + break; + case 'b': + intraBandwidth = Double.parseDouble(args[++i]); + break; + case 'l': + c_delay = Double.parseDouble(args[++i]); + break; + case 'q': + q_delay = Double.parseDouble(args[++i]); + break; + case 'e': + e_delay = Double.parseDouble(args[++i]); + break; + case 'p': + p_delay = Double.parseDouble(args[++i]); + break; + case 'i': + interval = Integer.parseInt(args[++i]); + break; + case 's': + computation_granularity = Double.parseDouble(args[++i]); + break; + case 'm': + communication_graunlarity = Double.parseDouble(args[++i]); + break; + case 'v': + vmNum = Integer.parseInt(args[++i]); + break; + case 'j': + jobNum = Integer.parseInt(args[++i]); + break; + } + } + // First step: Initialize the WorkflowSim package. + + + /** + * Should change this based on real physical path + */ + //String daxPath = "/Users/chenweiwei/Research/balanced_clustering/generator/BharathiPaper/Fake_1.xml"; + if (daxPath == null) { + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + File daxFile = new File(daxPath); + if (!daxFile.exists()) { + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + /* + * Use default Fault Tolerant Parameters + */ + Parameters.FTCMonitor ftc_monitor = Parameters.FTCMonitor.MONITOR_NONE; + Parameters.FTCFailure ftc_failure = Parameters.FTCFailure.FAILURE_NONE; + Parameters.FTCluteringAlgorithm ftc_method = null; + + /** + * Since we are using MINMIN scheduling algorithm, the planning + * algorithm should be INVALID such that the planner would not + * override the result of the scheduler + */ + Parameters.SchedulingAlgorithm sch_method = Parameters.SchedulingAlgorithm.DATA; + Parameters.PlanningAlgorithm pln_method = Parameters.PlanningAlgorithm.INVALID; + ReplicaCatalog.FileSystem file_system = ReplicaCatalog.FileSystem.LOCAL; + + /** + * clustering delay must be added, if you don't need it, you can set + * all the clustering delay to be zero, but not null + */ + Map clusteringDelay = new HashMap(); + Map queueDelay = new HashMap(); + Map postscriptDelay = new HashMap(); + Map engineDelay = new HashMap(); + /** + * application has at most 11 horizontal levels + */ + int maxLevel = 11; + for (int level = 0; level < maxLevel; level++) { + clusteringDelay.put(level, c_delay); + queueDelay.put(level, q_delay); + postscriptDelay.put(level, p_delay); + engineDelay.put(level, e_delay); + } + // Add clustering delay to the overhead parameters + /** + * Map wed_delay, Map queue_delay, + * Map post_delay, Map + * cluster_delay, + */ + OverheadParameters op = new OverheadParameters(interval, engineDelay, queueDelay, postscriptDelay, clusteringDelay, 0);; + + /** + * Balanced Clustering + */ + ClusteringParameters.ClusteringMethod method = null; + if (code.equalsIgnoreCase("m")) { + method = ClusteringParameters.ClusteringMethod.DFJS; + } else if (code.equalsIgnoreCase("a")) { + method = ClusteringParameters.ClusteringMethod.AFJS; + } else if (code.equalsIgnoreCase("n")){ + method = ClusteringParameters.ClusteringMethod.NONE; + } + else { + method = ClusteringParameters.ClusteringMethod.BALANCED; + } + /** + * r: Horizontal Runtime Balancing (HRB) d: Horizontal Distance + * Balancing (HDB) i: Horizontal Impact Factor Balancing (HIFB) h: + * Horizontal Random Balancing , the original horizontal clustering + */ + ClusteringParameters cp = new ClusteringParameters(jobNum, 0, method, code); + if (communication_graunlarity != 0.0) { + cp.setGranularityDataSize(communication_graunlarity); + } + if (computation_granularity != 0.0) { + cp.setGranularityTimeSize(computation_granularity); + } + + /** + * Initialize static parameters + */ + Parameters.init(ftc_method, ftc_monitor, ftc_failure, + null, vmNum, daxPath, null, + null, op, cp, sch_method, pln_method, + null, 0); + ReplicaCatalog.init(file_system); + + FailureMonitor.init(); + FailureGenerator.init(); + + // before creating any entities. + int num_user = 1; // number of grid users + Calendar calendar = Calendar.getInstance(); + boolean trace_flag = false; // mean trace events + + // Initialize the CloudSim library + CloudSim.init(num_user, calendar, trace_flag); + + DatacenterExtended datacenter0 = createDatacenter("Datacenter_0", intraBandwidth, vmNum); + + /** + * Create a WorkflowPlanner with one schedulers. + */ + WorkflowPlanner wfPlanner = new WorkflowPlanner("planner_0", 1); + /** + * Create a WorkflowEngine. + */ + WorkflowEngine wfEngine = wfPlanner.getWorkflowEngine(); + /** + * Create a list of VMs.The userId of a vm is basically the id of + * the scheduler that controls this vm. + */ + List vmlist0 = createVM(wfEngine.getSchedulerId(0), Parameters.getVmNum()); + + /** + * Submits this list of vms to this WorkflowEngine. + */ + wfEngine.submitVmList(vmlist0, 0); + + /** + * Binds the data centers with the scheduler. + */ + wfEngine.bindSchedulerDatacenter(datacenter0.getId(), 0); + + CloudSim.startSimulation(); + + + List outputList0 = wfEngine.getJobsReceivedList(); + + CloudSim.stopSimulation(); + + printJobList(outputList0); + + + } catch (Exception e) { + Log.printLine("The simulation has been terminated due to an unexpected error"); + } + } + + private static DatacenterExtended createDatacenter(String name, double intraBandwidth , int vmNumber) { + + // Here are the steps needed to create a PowerDatacenter: + // 1. We need to create a list to store one or more + // Machines + //int vmNumber = 20; + List hostList = new ArrayList(); + + // 2. A Machine contains one or more PEs or CPUs/Cores. Therefore, should + // create a list to store these PEs before creating + // a Machine. + for (int i = 1; i <= vmNumber; i++) { + List peList1 = new ArrayList(); + int mips = 2000; + // 3. Create PEs and add these into the list. + //for a quad-core machine, a list of 4 PEs is required: + peList1.add(new Pe(0, new PeProvisionerSimple(mips))); // need to store Pe id and MIPS Rating + peList1.add(new Pe(1, new PeProvisionerSimple(mips))); + + int hostId = 0; + int ram = 2048; //host memory (MB) + long storage = 1000000; //host storage + int bw = 10000; + hostList.add( + new Host( + hostId, + new RamProvisionerSimple(ram), + new BwProvisionerSimple(bw), + storage, + peList1, + new VmSchedulerTimeShared(peList1))); // This is our first machine + hostId++; + + } + + // 5. Create a DatacenterCharacteristics object that stores the + // properties of a data center: architecture, OS, list of + // Machines, allocation policy: time- or space-shared, time zone + // and its price (G$/Pe time unit). + String arch = "x86"; // system architecture + String os = "Linux"; // operating system + String vmm = "Xen"; + double time_zone = 10.0; // time zone this resource located + double cost = 3.0; // the cost of using processing in this resource + double costPerMem = 0.05; // the cost of using memory in this resource + double costPerStorage = 0.1; // the cost of using storage in this resource + double costPerBw = 0.1; // the cost of using bw in this resource + LinkedList storageList = new LinkedList(); //we are not adding SAN devices by now + DatacenterExtended datacenter = null; + + + DatacenterCharacteristics characteristics = new DatacenterCharacteristics( + arch, os, vmm, hostList, time_zone, cost, costPerMem, costPerStorage, costPerBw); + + + // 6. Finally, we need to create a cluster storage object. + /** + * The bandwidth within a data center. + */ + //double intraBandwidth = 1.5e3;// the number comes from the futuregrid site, you can specify your bw + try { + DistributedClusterStorage s1 = new DistributedClusterStorage(name, 1e12, vmNumber, intraBandwidth / 2); + + // The bandwidth from one vm to another vm + for (int source = 0; source < vmNumber; source++) { + for (int destination = 0; destination < vmNumber; destination++) { + if (source == destination) { + continue; + } + s1.setBandwidth(source, destination, intraBandwidth); + } + } + storageList.add(s1); + datacenter = new DatacenterExtended(name, characteristics, new VmAllocationPolicySimple(hostList), storageList, 0); + } catch (Exception e) { + } + + return datacenter; + } + + /** + * Prints the job objects + * + * @param list list of jobs + */ + private static void printJobList(List list) { + int size = list.size(); + Job job; + + String indent = " "; + Log.printLine(); + Log.printLine("========== OUTPUT =========="); + Log.printLine("Cloudlet ID" + indent + "STATUS" + indent + + "Data center ID" + indent + "VM ID" + indent + indent + "Time" + indent + "Start Time" + indent + "Finish Time" + indent + "Depth"); + + DecimalFormat dft = new DecimalFormat("###.##"); + for (int i = 0; i < size; i++) { + job = list.get(i); + Log.print(indent + job.getCloudletId() + indent + indent); + + if (job.getCloudletStatus() == Cloudlet.SUCCESS) { + Log.print("SUCCESS"); + + Log.printLine(indent + indent + job.getResourceId() + indent + indent + indent + job.getVmId() + + indent + indent + indent + dft.format(job.getActualCPUTime()) + + indent + indent + dft.format(job.getExecStartTime()) + indent + indent + indent + + dft.format(job.getFinishTime()) + indent + indent + indent + job.getDepth()); + } else if (job.getCloudletStatus() == Cloudlet.FAILED) { + Log.print("FAILED"); + + Log.printLine(indent + indent + job.getResourceId() + indent + indent + indent + job.getVmId() + + indent + indent + indent + dft.format(job.getActualCPUTime()) + + indent + indent + dft.format(job.getExecStartTime()) + indent + indent + indent + + dft.format(job.getFinishTime()) + indent + indent + indent + job.getDepth()); + } + } + + } +} diff --git a/examples/org/workflowsim/examples/failure/clustering/FaultTolerantClusteringExample3.java b/examples/org/workflowsim/examples/failure/clustering/FaultTolerantClusteringExample3.java index 8c15f670..88ffea0c 100644 --- a/examples/org/workflowsim/examples/failure/clustering/FaultTolerantClusteringExample3.java +++ b/examples/org/workflowsim/examples/failure/clustering/FaultTolerantClusteringExample3.java @@ -130,7 +130,7 @@ public static void main(String[] args) { /** * In this example, we have horizontal clustering and we use Dynamic Clustering. */ - Parameters.FTCluteringAlgorithm ftc_method = Parameters.FTCluteringAlgorithm.FTCLUSTERING_SR; + Parameters.FTCluteringAlgorithm ftc_method = Parameters.FTCluteringAlgorithm.FTCLUSTERING_DR; /** * Task failure rate for each level * diff --git a/sources/org/workflowsim/ClusteringEngine.java b/sources/org/workflowsim/ClusteringEngine.java index fe800030..121605a8 100644 --- a/sources/org/workflowsim/ClusteringEngine.java +++ b/sources/org/workflowsim/ClusteringEngine.java @@ -22,8 +22,10 @@ import org.cloudbus.cloudsim.core.CloudSimTags; import org.cloudbus.cloudsim.core.SimEntity; import org.cloudbus.cloudsim.core.SimEvent; +import org.workflowsim.clustering.AFJSClustering; import org.workflowsim.clustering.BasicClustering; import org.workflowsim.clustering.BlockClustering; +import org.workflowsim.clustering.DFJSClustering; import org.workflowsim.clustering.HorizontalClustering; import org.workflowsim.clustering.VerticalClustering; import org.workflowsim.clustering.balancing.BalancedClustering; @@ -178,6 +180,19 @@ else if (params.getClustersSize() != 0) { case BALANCED: this.engine = new BalancedClustering(params.getClustersNum()); break; + /** + * Perform DFJS Clustering + */ + case DFJS: + this.engine = new DFJSClustering(params.getGranularityTimeSize()); + break; + + /** + * Perform AFJS Clustering + */ + case AFJS: + this.engine = new AFJSClustering(params.getGranularityTimeSize(), params.getGranularityDataSize()); + break; /** * By default, it does no clustering */ diff --git a/sources/org/workflowsim/DatacenterExtended.java b/sources/org/workflowsim/DatacenterExtended.java index 4ed5e70a..e9fa6002 100644 --- a/sources/org/workflowsim/DatacenterExtended.java +++ b/sources/org/workflowsim/DatacenterExtended.java @@ -49,7 +49,7 @@ public DatacenterExtended(String name, List storageList, double schedulingInterval) throws Exception { super(name, characteristics, vmAllocationPolicy, storageList, schedulingInterval); - + } @Override diff --git a/sources/org/workflowsim/WorkflowPlanner.java b/sources/org/workflowsim/WorkflowPlanner.java index b7559d35..926098ca 100644 --- a/sources/org/workflowsim/WorkflowPlanner.java +++ b/sources/org/workflowsim/WorkflowPlanner.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.cloudbus.cloudsim.Log; import org.cloudbus.cloudsim.core.CloudSimTags; import org.cloudbus.cloudsim.core.SimEntity; @@ -217,9 +218,16 @@ private void processImpactFactors(List taskList) { exits.add(task); } } + /** + * Instead of giving them average IF for different branches, give them random + * in case of Genome and SIPHT + */ double avg = 1.0 / exits.size(); + Random random = new Random(System.currentTimeMillis()); + double IF = 1.0; for (Task task : exits) { - addImpact(task, avg); + addImpact(task, IF); + IF = random.nextInt(); } } diff --git a/sources/org/workflowsim/clustering/AFJSClustering.java b/sources/org/workflowsim/clustering/AFJSClustering.java new file mode 100644 index 00000000..c424cbfb --- /dev/null +++ b/sources/org/workflowsim/clustering/AFJSClustering.java @@ -0,0 +1,149 @@ +/** + * Copyright 2012-2013 University Of Southern California + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.workflowsim.clustering; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.cloudbus.cloudsim.File; +import org.workflowsim.Task; + +/** + * AFJSClustering is based on Liu&Liao,'s 2009 paper: + * Grouping-based Fine-grained Job Scheduling in Grid Computing + * + * @author Weiwei Chen + * @since WorkflowSim Toolkit 1.0 + * @date Dec 31, 2013 + */ +public class AFJSClustering extends BasicClustering { + + + /** + * The granularity size for computation used in Liu&Liao's paper + */ + private double granularity_time; + /** + * The granularity size for communication used in Liu&Liao's paper + */ + private double granularity_data; + + /** + * The map from depth to tasks at that depth. + */ + private Map mDepth2Task; + + /** + * Initialize a AFJSClustering with two granularity sizes + * @param granularity_time + * @param granularity_data + */ + public AFJSClustering(double granularity_time, double granularity_data) { + super(); + this.granularity_time = granularity_time; + this.granularity_data = granularity_data; + this.mDepth2Task = new HashMap(); + + } + + /** + * The main function + */ + @Override + public void run() { + + for (Iterator it = getTaskList().iterator(); it.hasNext();) { + Task task = (Task) it.next(); + int depth = task.getDepth(); + if (!mDepth2Task.containsKey(depth)) { + mDepth2Task.put(depth, new ArrayList()); + } + ArrayList list = (ArrayList) mDepth2Task.get(depth); + if (!list.contains(task)) { + list.add(task); + } + + } + + /** + * if granularity is set. + */ + if (this.granularity_time > 0 && this.granularity_data > 0) { + doDFJSClustering(); + } + + updateDependencies(); + addClustDelay(); + } + + /** + * Merges tasks based on resource capacity (granularity) + */ + private void doDFJSClustering() { + + for (Iterator it = mDepth2Task.entrySet().iterator(); it.hasNext();) { + Map.Entry pairs = (Map.Entry) it.next(); + ArrayList list = (ArrayList) pairs.getValue(); + + long seed = System.nanoTime(); + //Collections.shuffle(list, new Random(seed)); + seed = System.nanoTime(); + //Collections.shuffle(list, new Random(seed)); + int num = list.size(); + + List taskList = new ArrayList(); + double sum = 0.0, data = 0.0; + for(int i = 0; i < num; i ++){ + Task task = (Task)list.get(i); + double runtime = task.getCloudletLength() / 1000; + + double filesize = 0.0; + for(Iterator fileIt = task.getFileList().iterator(); fileIt.hasNext();){ + File file = (File)fileIt.next(); + filesize += file.getSize(); + } + filesize /= 1000; + if(sum + runtime > this.granularity_time || data + filesize > this.granularity_data){ + if(taskList.isEmpty()){ + taskList.add(task); + addTasks2Job(taskList); + taskList = new ArrayList(); + sum = 0.0; + data = 0.0; + }else{ + addTasks2Job(taskList); + taskList = new ArrayList(); + sum = 0.0; + data = 0.0; + taskList.add(task); + } + }else{ + taskList.add(task); + sum += runtime; + data += filesize; + } + } + if(!taskList.isEmpty()){ + addTasks2Job(taskList); + } + } + } + +} diff --git a/sources/org/workflowsim/clustering/DFJSClustering.java b/sources/org/workflowsim/clustering/DFJSClustering.java new file mode 100644 index 00000000..7d456ca1 --- /dev/null +++ b/sources/org/workflowsim/clustering/DFJSClustering.java @@ -0,0 +1,131 @@ +/** + * Copyright 2012-2013 University Of Southern California + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.workflowsim.clustering; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.workflowsim.Task; + +/** + * DFJSClustering is based on Muthuvelu,'s 2005 paper: + * A Dynamic Job Grouping-Based Scheduling for Deploying Applications with Fine-Grained Tasks on Global Grids + * + * @author Weiwei Chen + * @since WorkflowSim Toolkit 1.0 + * @date Dec 31, 2013 + */ +public class DFJSClustering extends BasicClustering { + + + /** + * The granularity size used in Muthuvelu's paper + */ + private double granularity; + /** + * The map from depth to tasks at that depth. + */ + private Map mDepth2Task; + + /** + * Initialize a DFJSClustering with granularity + * @param granularity + */ + public DFJSClustering(double granularity) { + super(); + this.granularity = granularity; + this.mDepth2Task = new HashMap(); + + } + + /** + * The main function + */ + @Override + public void run() { + + for (Iterator it = getTaskList().iterator(); it.hasNext();) { + Task task = (Task) it.next(); + int depth = task.getDepth(); + if (!mDepth2Task.containsKey(depth)) { + mDepth2Task.put(depth, new ArrayList()); + } + ArrayList list = (ArrayList) mDepth2Task.get(depth); + if (!list.contains(task)) { + list.add(task); + } + + } + + /** + * if granularity is set. + */ + if (this.granularity > 0) { + doDFJSClustering(); + } + + updateDependencies(); + addClustDelay(); + } + + /** + * Merges tasks based on resource capacity (granularity) + */ + private void doDFJSClustering() { + + for (Iterator it = mDepth2Task.entrySet().iterator(); it.hasNext();) { + Map.Entry pairs = (Map.Entry) it.next(); + ArrayList list = (ArrayList) pairs.getValue(); + + long seed = System.nanoTime(); + Collections.shuffle(list, new Random(seed)); + seed = System.nanoTime(); + Collections.shuffle(list, new Random(seed)); + int num = list.size(); + + List taskList = new ArrayList(); + double sum = 0.0; + for(int i = 0; i < num; i ++){ + Task task = (Task)list.get(i); + double runtime = task.getCloudletLength() / 1000; + if(sum + runtime > this.granularity ){ + if(taskList.isEmpty()){ + taskList.add(task); + addTasks2Job(taskList); + taskList = new ArrayList(); + sum = 0.0; + }else{ + addTasks2Job(taskList); + taskList = new ArrayList(); + sum = 0.0; + taskList.add(task); + } + }else{ + taskList.add(task); + sum += runtime; + } + } + if(!taskList.isEmpty()){ + addTasks2Job(taskList); + } + } + } + +} diff --git a/sources/org/workflowsim/clustering/balancing/BalancedClustering.java b/sources/org/workflowsim/clustering/balancing/BalancedClustering.java index d9773763..cd62e635 100644 --- a/sources/org/workflowsim/clustering/balancing/BalancedClustering.java +++ b/sources/org/workflowsim/clustering/balancing/BalancedClustering.java @@ -129,31 +129,33 @@ public void printMetrics() { addImpact(set, avg); } - for (Entry entry : map.entrySet()) { - int depth = (Integer) entry.getKey(); - ArrayList list = (ArrayList) entry.getValue(); - /** - * Horizontal Runtime Variance. - */ - double hrv = new HorizontalRuntimeVariance().getMetric(list); - /** - * Impact Factor Variance. - */ - double ifv = new ImpactFactorVariance().getMetric(list); - /** - * Pipeline Runtime Variance. - */ - double prv = new PipelineRuntimeVariance().getMetric(list); - /** - * Distance Variance. - */ - double dv = new DistanceVariance().getMetric(list); - Log.printLine("HRV " + depth + " " + list.size() - + " " + hrv + "\nIFV " + depth + " " - + list.size() + " " + ifv + "\nPRV " + depth - + " " + list.size() + " " + prv + "\nDV " + depth + " " + list.size() + " " + dv); - - } + +// for (Entry entry : map.entrySet()) { +// int depth = (Integer) entry.getKey(); +// ArrayList list = (ArrayList) entry.getValue(); +// /** +// * Horizontal Runtime Variance. +// */ +// double hrv = new HorizontalRuntimeVariance().getMetric(list); +// /** +// * Impact Factor Variance. +// */ +// double ifv = new ImpactFactorVariance().getMetric(list); +// /** +// * Pipeline Runtime Variance. +// */ +// double prv = new PipelineRuntimeVariance().getMetric(list); +// /** +// * Distance Variance. +// */ +// double dv = new DistanceVariance().getMetric(list); +// Log.printLine("HRV " + depth + " " + list.size() +// + " " + hrv + "\nIFV " + depth + " " +// + list.size() + " " + ifv + "\nPRV " + depth +// + " " + list.size() + " " + prv + "\nDV " + depth + " " + list.size() + " " + dv); +// +// } + } /** @@ -318,10 +320,10 @@ public void run() { printMetrics(); String code = Parameters.getClusteringParameters().getCode(); - Map> map = getCurrentTaskSetAtLevels(); + Map> map = null; if (code != null) { for (char c : code.toCharArray()) { - + map = getCurrentTaskSetAtLevels(); switch (c) { case 'v': @@ -362,10 +364,11 @@ public void run() { break; } } - printMetrics(); + //printMetrics does not work for vetical clustering + //printMetrics(); } - printOut(); + //printOut(); Collection sets = mTask2TaskSet.values(); for (Iterator it = sets.iterator(); it.hasNext();) { diff --git a/sources/org/workflowsim/clustering/balancing/methods/BalancingMethod.java b/sources/org/workflowsim/clustering/balancing/methods/BalancingMethod.java index 288f7e75..194b631d 100644 --- a/sources/org/workflowsim/clustering/balancing/methods/BalancingMethod.java +++ b/sources/org/workflowsim/clustering/balancing/methods/BalancingMethod.java @@ -79,6 +79,8 @@ public int getClusterNum() { return this.clusterNum; } + + /** * Add all the tasks in tail to head and then clean tail. Can be reused with * verticalClustering() @@ -86,6 +88,16 @@ public int getClusterNum() { * @param head */ public void addTaskSet2TaskSet(TaskSet tail, TaskSet head) { + addTaskSet2TaskSet(tail, head, false); + } + /** + * Add all the tasks in tail to head and then clean tail. Can be reused with + * verticalClustering() + * @param tail + * @param head + * @param vertical + */ + public void addTaskSet2TaskSet(TaskSet tail, TaskSet head, boolean vertical) { head.addTask(tail.getTaskList()); head.getParentList().remove(tail); //update manually, beautifully, I like it here @@ -94,9 +106,13 @@ public void addTaskSet2TaskSet(TaskSet tail, TaskSet head) { } /* * At the same level you can do so, but for vc it doens't, - * while usually for vc we don't need to calculate impact + * */ - head.setImpactFafctor(head.getImpactFactor() + tail.getImpactFactor()); + if(vertical){ + //do not need to change Impact Factor + }else{ + head.setImpactFafctor(head.getImpactFactor() + tail.getImpactFactor()); + } for (TaskSet taskSet : tail.getParentList()) { taskSet.getChildList().remove(tail); if (!taskSet.getChildList().contains(head)) { @@ -109,10 +125,11 @@ public void addTaskSet2TaskSet(TaskSet tail, TaskSet head) { for (TaskSet taskSet : tail.getChildList()) { taskSet.getParentList().remove(tail); - if (!taskSet.getParentList().contains(head)) { + //A big bug fixed + if (!taskSet.getParentList().contains(head) && !taskSet.equals(head)) { taskSet.getParentList().add(head); } - if (!head.getChildList().contains(taskSet)) { + if (!head.getChildList().contains(taskSet) && !taskSet.equals(head)) { head.getChildList().add(taskSet); } } diff --git a/sources/org/workflowsim/clustering/balancing/methods/HorizontalDistanceBalancing.java b/sources/org/workflowsim/clustering/balancing/methods/HorizontalDistanceBalancing.java index 71cde3f4..c3274c13 100644 --- a/sources/org/workflowsim/clustering/balancing/methods/HorizontalDistanceBalancing.java +++ b/sources/org/workflowsim/clustering/balancing/methods/HorizontalDistanceBalancing.java @@ -19,13 +19,14 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import org.workflowsim.Task; import org.workflowsim.clustering.TaskSet; -import sun.net.www.http.KeepAliveCache; /** * HorizontalDistanceBalancing is a method that merges tasks based on distance metric @@ -58,6 +59,7 @@ public void run() { } + /** * Sort taskSet based on their impact factors and then merge similar taskSet together * @param taskList @@ -66,31 +68,39 @@ public void process(ArrayList taskList) { if (taskList.size() > getClusterNum()) { ArrayList jobList = new ArrayList(); - for (int i = 0; i < getClusterNum(); i++) { - jobList.add(new TaskSet()); - } - int clusters_size = taskList.size() / getClusterNum(); - if(clusters_size * getClusterNum() < taskList.size()){ - clusters_size ++; - } - //sortListDecreasing(taskList); - preprocessing(taskList, jobList); - - for (TaskSet set : taskList) { - //sortListIncreasing(jobList); - TaskSet job = getCandidateTastSet(jobList, set, clusters_size); - addTaskSet2TaskSet(set, job); - job.addTask(set.getTaskList()); - job.setImpactFafctor(set.getImpactFactor()); - //update dependency - for (Task task : set.getTaskList()) { - getTaskMap().put(task, job);//this is enough - //impact factor is not updated + + + + for (int i = 0; i < getClusterNum(); i++) { + jobList.add(new TaskSet()); + } + int clusters_size = taskList.size() / getClusterNum(); + if(clusters_size * getClusterNum() < taskList.size()){ + clusters_size ++; } - } + //If you have vertical before horizontal make sure adjust this + //for LIGO it is 2 for Genome & CyberShake it is 4 + //clusters_size *= 2; + //sortListDecreasing(taskList); + preprocessing(taskList, jobList); - taskList.clear();//you sure? + for (TaskSet set : taskList) { + //sortListIncreasing(jobList); + TaskSet job = getCandidateTastSet(jobList, set, clusters_size); + addTaskSet2TaskSet(set, job); + job.addTask(set.getTaskList()); + job.setImpactFafctor(set.getImpactFactor()); + //update dependency + for (Task task : set.getTaskList()) { + getTaskMap().put(task, job);//this is enough + //impact factor is not updated + } + + } + + taskList.clear();//you sure? + } else { //do nothing since } @@ -98,7 +108,6 @@ public void process(ArrayList taskList) { } - /** * Sort taskSet in an ascending order of impact factor * @param taskList taskSets to be sorted diff --git a/sources/org/workflowsim/clustering/balancing/methods/HorizontalImpactBalancing.java b/sources/org/workflowsim/clustering/balancing/methods/HorizontalImpactBalancing.java index 836a5cce..7b21dbd3 100644 --- a/sources/org/workflowsim/clustering/balancing/methods/HorizontalImpactBalancing.java +++ b/sources/org/workflowsim/clustering/balancing/methods/HorizontalImpactBalancing.java @@ -60,16 +60,18 @@ public void run() { } -// protected TaskSet getCandidateTastSet(ArrayList taskList, TaskSet checkSet) { -// long min = taskList.get(0).getJobRuntime(); -// for (TaskSet set : taskList) { -// if (set.getJobRuntime() == min && checkSet.getImpactFactor() == set.getImpactFactor()) { -// return set; -// } -// } -// return taskList.get(0); -// -// } + private double getHIFV(ArrayList jobList){ + double sum = 0.0; + double sumSquare = 0.0; + for(TaskSet taskSet : jobList){ + sum += taskSet.getImpactFactor(); + sumSquare += taskSet.getImpactFactor() * taskSet.getImpactFactor(); + } + double avg = sum / jobList.size(); + double avgSquare = sumSquare / jobList.size(); + double variance = avgSquare - avg * avg; + return variance ; + } /** * Sort taskSet based on their impact factors and then merge similar taskSet together @@ -79,48 +81,66 @@ public void process(ArrayList taskList) { if (taskList.size() > getClusterNum()) { ArrayList jobList = new ArrayList(); - for (int i = 0; i < getClusterNum(); i++) { - jobList.add(new TaskSet()); - } - int clusters_size = taskList.size() / getClusterNum(); - if(clusters_size * getClusterNum() < taskList.size()){ - clusters_size ++; - } - sortListDecreasing(taskList); - //preprocessing(taskList, jobList); - + if( Math.abs(getHIFV(taskList)) < 0.0001 ) + { + for (int i = 0; i < getClusterNum(); i++) { + jobList.add(new TaskSet()); + } + sortListDecreasing(taskList); + for (TaskSet set : taskList) { + //MinHeap is required + sortListIncreasing(jobList); + TaskSet job = (TaskSet) jobList.get(0); + job.addTask(set.getTaskList()); + //update dependency + for (Task task : set.getTaskList()) { + getTaskMap().put(task, job);//this is enough + } - for (TaskSet set : taskList) { - //sortListIncreasing(jobList); - //Log.printLine(set.getJobRuntime()); - TaskSet job = null; - try{ - job = getCandidateTastSet(jobList, set, clusters_size); - }catch(Exception e) { - e.printStackTrace(); } - addTaskSet2TaskSet(set, job); - job.addTask(set.getTaskList()); - job.setImpactFafctor(set.getImpactFactor()); - //update dependency - for (Task task : set.getTaskList()) { - getTaskMap().put(task, job);//this is enough - //impact factor is not updated + + taskList.clear();//you sure? + }else{ + + for (int i = 0; i < getClusterNum(); i++) { + jobList.add(new TaskSet()); + } + int clusters_size = taskList.size() / getClusterNum(); + if(clusters_size * getClusterNum() < taskList.size()){ + clusters_size ++; } + sortListDecreasing(taskList); + //preprocessing(taskList, jobList); - } + //In the case if you do vertical first you need to multiple clusters_size with your vertical strength + //If you do vertical clustering after you don't have to adjust this + //For LIGO it is 2 for genome it is 4 + //clusters_size *= 4; -// Log.printLine("level"); -// for(int i = 0; i < taskList.size();i++){ -// TaskSet set = (TaskSet) (taskList.get(i)); -// Log.printLine("TaskSet "); -// for(int j = 0; j < set.getTaskList().size(); j++){ -// Task task = set.getTaskList().get(j); -// Log.printLine("Task: " + task.getImpact()); -// } -// } - taskList.clear();//you sure? + for (TaskSet set : taskList) { + //sortListIncreasing(jobList); + //Log.printLine(set.getJobRuntime()); + TaskSet job = null; + try{ + + job = getCandidateTastSet(jobList, set, clusters_size); + }catch(Exception e) { + e.printStackTrace(); + } + addTaskSet2TaskSet(set, job); + job.addTask(set.getTaskList()); + job.setImpactFafctor(set.getImpactFactor()); + //update dependency + for (Task task : set.getTaskList()) { + getTaskMap().put(task, job);//this is enough + //impact factor is not updated + } + + } + + taskList.clear();//you sure? + } } else { //do nothing since } @@ -246,7 +266,7 @@ private ArrayList getNextPotentialTaskSets(ArrayList taskList, TaskSet checkSet, int clusters_size){ HashMap map = new HashMap(); - + for (TaskSet set : taskList) { double factor = set.getImpactFactor(); @@ -267,7 +287,6 @@ private ArrayList getNextPotentialTaskSets(ArrayList taskList, } } } - if(returnList.isEmpty()){ ArrayList zeros = (ArrayList)map.get(0.0); if(zeros!=null && !zeros.isEmpty()) @@ -275,7 +294,6 @@ private ArrayList getNextPotentialTaskSets(ArrayList taskList, returnList.addAll(zeros); } } - if(returnList.isEmpty()){ returnList.clear();//? for (TaskSet set : taskList) { diff --git a/sources/org/workflowsim/clustering/balancing/methods/VerticalBalancing.java b/sources/org/workflowsim/clustering/balancing/methods/VerticalBalancing.java index ccf6b6cf..91fbbdd0 100644 --- a/sources/org/workflowsim/clustering/balancing/methods/VerticalBalancing.java +++ b/sources/org/workflowsim/clustering/balancing/methods/VerticalBalancing.java @@ -61,7 +61,7 @@ public void run() { ArrayList pList = child.getParentList(); if (pList.size() == 1) { //add parent to child (don't do it reversely) - addTaskSet2TaskSet(set, child); + addTaskSet2TaskSet(set, child, true); } } diff --git a/sources/org/workflowsim/scheduling/DataAwareSchedulingAlgorithm.java b/sources/org/workflowsim/scheduling/DataAwareSchedulingAlgorithm.java index 0eabe51b..c462df96 100644 --- a/sources/org/workflowsim/scheduling/DataAwareSchedulingAlgorithm.java +++ b/sources/org/workflowsim/scheduling/DataAwareSchedulingAlgorithm.java @@ -77,9 +77,9 @@ public void run() { - Log.printLine("Schedules " + cloudlet.getCloudletId() + " with " - + cloudlet.getCloudletLength() + " to VM " + closestVm.getId() - +" with " + closestVm.getCurrentRequestedTotalMips() + " and data is " + minTime); + //Log.printLine("Schedules " + cloudlet.getCloudletId() + " with " + // + cloudlet.getCloudletLength() + " to VM " + closestVm.getId() + // +" with " + closestVm.getCurrentRequestedTotalMips() + " and data is " + minTime); } diff --git a/sources/org/workflowsim/utils/ClusteringParameters.java b/sources/org/workflowsim/utils/ClusteringParameters.java index 903c576d..a578c61c 100644 --- a/sources/org/workflowsim/utils/ClusteringParameters.java +++ b/sources/org/workflowsim/utils/ClusteringParameters.java @@ -39,7 +39,7 @@ public class ClusteringParameters { */ public enum ClusteringMethod { - HORIZONTAL, VERTICAL, NONE, BLOCK, BALANCED + HORIZONTAL, VERTICAL, NONE, BLOCK, BALANCED, DFJS, AFJS } /** * Used for balanced clustering to tell which specific balanced clustering @@ -51,6 +51,17 @@ public enum ClusteringMethod { */ private ClusteringMethod method; + /** + * The granularity size of computational resource + * used in AFJS and DFJS clustering algorithm + */ + private double granularity_time; + + /** + * The granularity size of communicational resource + * used in AFJS clustering algorithm + */ + private double granularity_data; /** * Gets the code for balanced clustering Please refer to our balanced * clustering paper for details @@ -88,7 +99,44 @@ public int getClustersSize() { public ClusteringMethod getClusteringMethod() { return method; } + + /** + * Gets the granularity data size + * + * @return granularity data + */ + public double getGranularityDataSize() + { + return this.granularity_data; + } + + /** + * Gets the granularity time size + * @return granularity time + */ + public double getGranularityTimeSize() + { + return this.granularity_time; + } + /** + * Sets the granularity time size + * @param granularity the granularity time size + */ + public void setGranularityTimeSize(double granularity) + { + this.granularity_time = granularity; + } + + /** + * Sets the granularity data size + * @param granularity the granularity data size + */ + public void setGranularityDataSize(double granularity) + { + this.granularity_data = granularity; + } + /** * Initialize a ClusteringParameters *