diff --git a/examples/org/workflowsim/examples/scheduling/DataAwareSchedulingAlgorithmExample2.java b/examples/org/workflowsim/examples/scheduling/DataAwareSchedulingAlgorithmExample2.java new file mode 100644 index 00000000..4333aed2 --- /dev/null +++ b/examples/org/workflowsim/examples/scheduling/DataAwareSchedulingAlgorithmExample2.java @@ -0,0 +1,145 @@ +/** + * Copyright 2012-2013 University Of Southern California + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.workflowsim.examples.scheduling; + +import java.io.File; +import java.util.Calendar; +import java.util.List; +import org.cloudbus.cloudsim.Log; +import org.cloudbus.cloudsim.core.CloudSim; +import org.workflowsim.CondorVM; +import org.workflowsim.WorkflowDatacenter; +import org.workflowsim.Job; +import org.workflowsim.WorkflowEngine; +import org.workflowsim.WorkflowPlanner; +import org.workflowsim.examples.planning.DHEFTPlanningAlgorithmExample1; +import org.workflowsim.utils.ClusteringParameters; +import org.workflowsim.utils.OverheadParameters; +import org.workflowsim.utils.Parameters; +import org.workflowsim.utils.ReplicaCatalog; + +/** + * This Data Aware Scheduling Algorithm + * + * @author Weiwei Chen + * @since WorkflowSim Toolkit 1.1 + * @date Nov 9, 2013 + */ +public class DataAwareSchedulingAlgorithmExample2 extends DHEFTPlanningAlgorithmExample1 { + + ////////////////////////// STATIC METHODS /////////////////////// + /** + * Creates main() to run this example This example has only one datacenter + * and one storage + */ + public static void main(String[] args) { + + + try { + // First step: Initialize the WorkflowSim package. + + /** + * However, the exact number of vms may not necessarily be vmNum If + * the data center or the host doesn't have sufficient resources the + * exact vmNum would be smaller than that. Take care. + */ + int vmNum = 5;//number of vms; + /** + * Should change this based on real physical path + */ + String daxPath = "/Users/chenweiwei/Work/WorkflowSim-1.0/config/dax/Montage_100.xml"; + + File daxFile = new File(daxPath); + if (!daxFile.exists()) { + Log.printLine("Warning: Please replace daxPath with the physical path in your working environment!"); + return; + } + + /** + * Since we are using HEFT planning algorithm, the scheduling + * algorithm should be static such that the scheduler would not + * override the result of the planner + */ + Parameters.SchedulingAlgorithm sch_method = Parameters.SchedulingAlgorithm.DATA; + Parameters.PlanningAlgorithm pln_method = Parameters.PlanningAlgorithm.INVALID; + ReplicaCatalog.FileSystem file_system = ReplicaCatalog.FileSystem.RANDOM; + + /** + * No overheads + */ + OverheadParameters op = new OverheadParameters(0, null, null, null, null, 0);; + + /** + * No Clustering + */ + ClusteringParameters.ClusteringMethod method = ClusteringParameters.ClusteringMethod.NONE; + ClusteringParameters cp = new ClusteringParameters(0, 0, method, null); + + /** + * Initialize static parameters + */ + Parameters.init(vmNum, daxPath, null, + null, op, cp, sch_method, pln_method, + null, 0); + ReplicaCatalog.init(file_system); + + // before creating any entities. + int num_user = 1; // number of grid users + Calendar calendar = Calendar.getInstance(); + boolean trace_flag = false; // mean trace events + + // Initialize the CloudSim library + CloudSim.init(num_user, calendar, trace_flag); + + WorkflowDatacenter datacenter0 = createDatacenter("Datacenter_0"); + + /** + * Create a WorkflowPlanner with one schedulers. + */ + WorkflowPlanner wfPlanner = new WorkflowPlanner("planner_0", 1); + /** + * Create a WorkflowEngine. + */ + WorkflowEngine wfEngine = wfPlanner.getWorkflowEngine(); + /** + * Create a list of VMs.The userId of a vm is basically the id of + * the scheduler that controls this vm. + */ + List vmlist0 = createVM(wfEngine.getSchedulerId(0), Parameters.getVmNum()); + + /** + * Submits this list of vms to this WorkflowEngine. + */ + wfEngine.submitVmList(vmlist0, 0); + + /** + * Binds the data centers with the scheduler. + */ + wfEngine.bindSchedulerDatacenter(datacenter0.getId(), 0); + + CloudSim.startSimulation(); + + List outputList0 = wfEngine.getJobsReceivedList(); + + CloudSim.stopSimulation(); + + printJobList(outputList0); + + } catch (Exception e) { + Log.printLine("The simulation has been terminated due to an unexpected error"); + } + } +} diff --git a/sources/org/workflowsim/WorkflowDatacenter.java b/sources/org/workflowsim/WorkflowDatacenter.java index db792e51..05e19ff4 100644 --- a/sources/org/workflowsim/WorkflowDatacenter.java +++ b/sources/org/workflowsim/WorkflowDatacenter.java @@ -17,6 +17,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Random; import org.cloudbus.cloudsim.Cloudlet; import org.cloudbus.cloudsim.CloudletScheduler; import org.cloudbus.cloudsim.Consts; @@ -57,6 +58,16 @@ public WorkflowDatacenter(String name, @Override protected void processOtherEvent(SimEvent ev) { + switch (ev.getTag()) { + case WorkflowSimTags.FILE_STAGE_OUT: + FileStageOutMessage message = (FileStageOutMessage) ev.getData(); + ReplicaCatalog.addStorageList(message.getFileName(), + Integer.toString(message.getDestinationVm())); + break; + default: + break; + } + } /** @@ -110,12 +121,12 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) { int userId = cl.getUserId(); int vmId = cl.getVmId(); Host host = getVmAllocationPolicy().getHost(vmId, userId); - CondorVM vm = (CondorVM)host.getVm(vmId, userId); + CondorVM vm = (CondorVM) host.getVm(vmId, userId); switch (Parameters.getCostModel()) { case DATACENTER: // process this Cloudlet to this CloudResource - cl.setResourceParameter(getId(), getCharacteristics().getCostPerSecond(), + cl.setResourceParameter(getId(), getCharacteristics().getCostPerSecond(), getCharacteristics().getCostPerBw()); break; case VM: @@ -222,7 +233,7 @@ private void stageInFile2FileSystem(Cloudlet cl) { * name) */ case LOCAL: - + case RANDOM: ReplicaCatalog.addStorageList(file.getName(), this.getName()); /** * Is it not really needed currently but it is left for @@ -287,11 +298,14 @@ protected double processDataStageIn(List requiredFiles, Cloudlet cl) throw File file = iter.next(); //The input file is not an output File if (isRealInputFile(requiredFiles, file)) { - double maxBwth = 0.0; List siteList = ReplicaCatalog.getStorageList(file.getName()); if (siteList.isEmpty()) { throw new Exception(file.getName() + " does not exist"); } + int vmId = cl.getVmId(); + int userId = cl.getUserId(); + Host host = getVmAllocationPolicy().getHost(vmId, userId); + Vm vm = host.getVm(vmId, userId); switch (ReplicaCatalog.getFileSystem()) { case SHARED: //stage-in job @@ -311,53 +325,12 @@ protected double processDataStageIn(List requiredFiles, Cloudlet cl) throw } break; case LOCAL: + time += calculateDataTransferDelay(file, userId, vmId, vm); + ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId)); + break; - int vmId = cl.getVmId(); - int userId = cl.getUserId(); - Host host = getVmAllocationPolicy().getHost(vmId, userId); - Vm vm = host.getVm(vmId, userId); - - boolean requiredFileStagein = true; - - for (Iterator it = siteList.iterator(); it.hasNext();) { - //site is where one replica of this data is located at - String site = (String) it.next(); - if (site.equals(this.getName())) { - continue; - } - /** - * This file is already in the local vm and thus it - * is no need to transfer - */ - if (site.equals(Integer.toString(vmId))) { - requiredFileStagein = false; - break; - } - double bwth; - if (site.equals(Parameters.SOURCE)) { - //transfers from the source to the VM is limited to the VM bw only - bwth = vm.getBw(); - //bwth = dcStorage.getBaseBandwidth(); - } else { - //transfers between two VMs is limited to both VMs - bwth = Math.min(vm.getBw(), getVmAllocationPolicy().getHost(Integer.parseInt(site), userId).getVm(Integer.parseInt(site), userId).getBw()); - //bwth = dcStorage.getBandwidth(Integer.parseInt(site), vmId); - } - if (bwth > maxBwth) { - maxBwth = bwth; - } - } - if (requiredFileStagein && maxBwth > 0.0) { - time += file.getSize() / Consts.MILLION * 8 / maxBwth; - } - - /** - * For the case when storage is too small it is not - * handled here - */ - //We should add but since CondorVm has a small capability it often fails - //We currently don't use this storage to do anything meaningful. It is left for future. - //condorVm.addLocalFile(file); + case RANDOM: + time += calculateDataTransferDelay(file, userId, vmId, vm); ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId)); break; } @@ -419,6 +392,48 @@ protected void checkCloudletCompletion() { } } } + + private double calculateDataTransferDelay(File file, int userId, int vmId, Vm vm) { + List siteList = ReplicaCatalog.getStorageList(file.getName()); + boolean requiredFileStagein = true; + double time = 0.0; + double maxBwth = 0.0; + for (Iterator it = siteList.iterator(); it.hasNext();) { + //site is where one replica of this data is located at + String site = (String) it.next(); + if (site.equals(this.getName())) { + continue; + } + /** + * This file is already in the local vm and thus it is no need to + * transfer + */ + if (site.equals(Integer.toString(vmId))) { + requiredFileStagein = false; + break; + } + double bwth; + if (site.equals(Parameters.SOURCE)) { + //transfers from the source to the VM is limited to the VM bw only + bwth = vm.getBw(); + //bwth = dcStorage.getBaseBandwidth(); + } else { + //transfers between two VMs is limited to both VMs + bwth = Math.min(vm.getBw(), + getVmAllocationPolicy().getHost(Integer.parseInt(site), userId) + .getVm(Integer.parseInt(site), userId).getBw()); + //bwth = dcStorage.getBandwidth(Integer.parseInt(site), vmId); + } + if (bwth > maxBwth) { + maxBwth = bwth; + } + } + if (requiredFileStagein && maxBwth > 0.0) { + time = file.getSize() / (double) Consts.MILLION * 8 / maxBwth; + } + return time; + } + /* * Register a file to the storage if it is an output file * @param requiredFiles, all files to be stage-in @@ -426,7 +441,6 @@ protected void checkCloudletCompletion() { * @pre $none * @post $none */ - private void register(Cloudlet cl) { Task tl = (Task) cl; List fList = tl.getFileList(); @@ -435,23 +449,57 @@ private void register(Cloudlet cl) { if (file.getType() == FileType.OUTPUT.value)//output file { + int vmId = cl.getVmId(); + int userId = cl.getUserId(); + Host host = getVmAllocationPolicy().getHost(vmId, userId); + CondorVM vm = (CondorVM) host.getVm(vmId, userId); switch (ReplicaCatalog.getFileSystem()) { case SHARED: ReplicaCatalog.addStorageList(file.getName(), this.getName()); break; case LOCAL: - int vmId = cl.getVmId(); - int userId = cl.getUserId(); - Host host = getVmAllocationPolicy().getHost(vmId, userId); - /** - * Left here for future work - */ - CondorVM vm = (CondorVM) host.getVm(vmId, userId); - ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId)); break; + case RANDOM: + ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId)); + Random random = new Random(System.currentTimeMillis()); + double factor = 0.1; + int vm2copy = (int) ((double) Parameters.getVmNum() * factor); + for (int i = 0; i < vm2copy; i++) { + int destination = (int) (random.nextDouble() * (double) Parameters.getVmNum()); + FileStageOutMessage message = new FileStageOutMessage(destination, vmId, file.getName()); + double delay = calculateDataTransferDelay(file, userId, vmId, vm); + send(this.getId(), delay, WorkflowSimTags.FILE_STAGE_OUT, message); + } + break; } } } } + + private class FileStageOutMessage { + + private int dest; + private int src; + private String name; + + public FileStageOutMessage( + int dest, int src, String name) { + this.dest = dest; + this.src = src; + this.name = name; + } + + public int getDestinationVm() { + return this.dest; + } + + public int getSourceVm() { + return this.src; + } + + public String getFileName() { + return this.name; + } + } } diff --git a/sources/org/workflowsim/WorkflowSimTags.java b/sources/org/workflowsim/WorkflowSimTags.java index 6d804c7e..7a5f8e0d 100644 --- a/sources/org/workflowsim/WorkflowSimTags.java +++ b/sources/org/workflowsim/WorkflowSimTags.java @@ -44,6 +44,8 @@ public class WorkflowSimTags { public static final int JOB_SUBMIT = BASE + 1; public static final int CLOUDLET_UPDATE = BASE + 5; public static final int CLOUDLET_CHECK = BASE + 6; + + public static final int FILE_STAGE_OUT = BASE + 7; /** * Private Constructor diff --git a/sources/org/workflowsim/utils/ReplicaCatalog.java b/sources/org/workflowsim/utils/ReplicaCatalog.java index edd7fb13..797b8407 100644 --- a/sources/org/workflowsim/utils/ReplicaCatalog.java +++ b/sources/org/workflowsim/utils/ReplicaCatalog.java @@ -36,7 +36,7 @@ public class ReplicaCatalog { */ public enum FileSystem { - SHARED, LOCAL + SHARED, LOCAL, RANDOM } /** * Map from file name to a file object