From 0499e11dedfc77c64ca57917952d56397f53508f Mon Sep 17 00:00:00 2001 From: Lewis John McGibbney Date: Wed, 2 May 2018 09:27:19 -0700 Subject: [PATCH 1/2] SDAP-13 Rearchitect MUDROD storage --- .gitignore | 6 + core/pom.xml | 2 +- .../structure/HybridRecommendation.java | 47 +- .../sdap/mudrod/utils/ClassLoadingUtils.java | 83 +++ .../sdap/mudrod/utils/ReflectionUtils.java | 148 ++++ pom.xml | 12 +- service/pom.xml | 6 +- storage/pom.xml | 142 ++++ .../sdap/mudrod/storage/StorageDriver.java | 100 +++ .../mudrod/storage/StorageDriverFactory.java | 89 +++ .../elasticsearch/ElasticSearchDriver.java | 636 ++++++++++++++++++ .../storage/elasticsearch/package-info.java | 14 + .../sdap/mudrod/storage/package-info.java | 14 + .../sdap/mudrod/storage/solr/SolrDriver.java | 189 ++++++ .../mudrod/storage/solr/package-info.java | 14 + web/pom.xml | 2 +- 16 files changed, 1474 insertions(+), 30 deletions(-) create mode 100644 core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java create mode 100644 core/src/main/java/org/apache/sdap/mudrod/utils/ReflectionUtils.java create mode 100644 storage/pom.xml create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriver.java create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/ElasticSearchDriver.java create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/package-info.java create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/package-info.java create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/solr/SolrDriver.java create mode 100644 storage/src/main/java/org/apache/sdap/mudrod/storage/solr/package-info.java diff --git a/.gitignore b/.gitignore index 3009cbf..56afcc1 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,9 @@ core/.externalToolBuilders/Maven_Ant_Builder.launch core/maven-eclipse.xml service/.classpath web/.classpath +storage/lib +storage/.classpath +storage/.externalToolBuilders/Maven_Ant_Builder.launch +storage/.gitignore +storage/maven-eclipse.xml +storage/target diff --git a/core/pom.xml b/core/pom.xml index 2725e87..93d5b34 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -18,7 +18,7 @@ 4.0.0 - org.apache.sdap.mudrod + org.apache.sdap mudrod-parent 0.0.1-SNAPSHOT ../ diff --git a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java index 2b29c03..662f8d6 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java +++ b/core/src/main/java/org/apache/sdap/mudrod/recommendation/structure/HybridRecommendation.java @@ -33,13 +33,11 @@ import java.util.*; /** - * Recommend metadata using combination all two methods, including content-based - * similarity and session-level similarity + * Recommend metadata using combination of content-based + * similarity and session-level similarity. */ public class HybridRecommendation extends DiscoveryStepAbstract { - /** - * - */ + private static final long serialVersionUID = 1L; // recommended metadata list protected transient List termList = new ArrayList<>(); @@ -50,17 +48,17 @@ public class HybridRecommendation extends DiscoveryStepAbstract { private static final String WEIGHT = "weight"; /** - * recommended data class Date: Sep 12, 2016 2:25:28 AM + * recommended data class */ class LinkedTerm { - public String term = null; - public double weight = 0; - public String model = null; + private String term; + private double weight = 0; + private String model; public LinkedTerm(String str, double w, String m) { - term = str; - weight = w; - model = m; + this.term = str; + this.weight = w; + this.model = m; } } @@ -79,7 +77,7 @@ public Object execute(Object o) { } /** - * Get recommended data for a giving dataset + * Get recommended data for a given dataset * * @param input: a giving dataset * @param num: the number of recommended dataset @@ -104,15 +102,15 @@ public JsonObject getRecomDataInJson(String input, int num) { JsonElement sessionSimJson = mapToJson(sortedSessionSimMap, num); resultJson.add("sessionSim", sessionSimJson); - Map hybirdSimMap = new HashMap(); + Map hybirdSimMap = new HashMap<>(); for (String name : sortedAbstractSimMap.keySet()) { - hybirdSimMap.put(name, sortedAbstractSimMap.get(name) /** 0.4 */); + hybirdSimMap.put(name, sortedAbstractSimMap.get(name)); } for (String name : sortedVariableSimMap.keySet()) { if (hybirdSimMap.get(name) != null) { - double sim = hybirdSimMap.get(name) + sortedVariableSimMap.get(name) /** 0.3 */; + double sim = hybirdSimMap.get(name) + sortedVariableSimMap.get(name); hybirdSimMap.put(name, Double.parseDouble(df.format(sim))); } else { double sim = sortedVariableSimMap.get(name); @@ -122,7 +120,7 @@ public JsonObject getRecomDataInJson(String input, int num) { for (String name : sortedSessionSimMap.keySet()) { if (hybirdSimMap.get(name) != null) { - double sim = hybirdSimMap.get(name) + sortedSessionSimMap.get(name) /** 0.1 */; + double sim = hybirdSimMap.get(name) + sortedSessionSimMap.get(name); hybirdSimMap.put(name, Double.parseDouble(df.format(sim))); } else { double sim = sortedSessionSimMap.get(name); @@ -164,13 +162,11 @@ protected JsonElement mapToJson(Map wordweights, int num) { } String nodesJson = gson.toJson(nodes); - JsonElement nodesElement = gson.fromJson(nodesJson, JsonElement.class); - - return nodesElement; + return gson.fromJson(nodesJson, JsonElement.class); } /** - * Get recommend dataset for a giving dataset + * Get recommend dataset for a given dataset * * @param type recommend method * @param input a giving dataset @@ -206,8 +202,12 @@ public Map getRelatedData(String type, String input, int num) { */ public List getRelatedDataFromES(String type, String input, int num) { - SearchRequestBuilder builder = es.getClient().prepareSearch(props.getProperty(INDEX_NAME)).setTypes(type).setQuery(QueryBuilders.termQuery("concept_A", input)).addSort(WEIGHT, SortOrder.DESC) - .setSize(num); + SearchRequestBuilder builder = es.getClient() + .prepareSearch(props.getProperty(INDEX_NAME)) + .setTypes(type) + .setQuery(QueryBuilders.termQuery("concept_A", input)) + .addSort(WEIGHT, SortOrder.DESC) + .setSize(num); SearchResponse usrhis = builder.execute().actionGet(); @@ -266,7 +266,6 @@ public static void main(String[] args) throws IOException { ESDriver es = new ESDriver(me.getConfig()); HybridRecommendation test = new HybridRecommendation(props, es, null); - // String input = "NSCAT_LEVEL_1.7_V2"; String input = "AQUARIUS_L3_SSS_SMIA_MONTHLY-CLIMATOLOGY_V4"; JsonObject json = test.getRecomDataInJson(input, 10); diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java b/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java new file mode 100644 index 0000000..bda78a5 --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +public class ClassLoadingUtils { + + private ClassLoadingUtils() { + //Utility Class + } + + /** + * Loads a class using the class loader. + * 1. The class loader of the current class is being used. + * 2. The thread context class loader is being used. + * If both approaches fail, returns null. + * + * @param className The name of the class to load. + * @return The class or null if no class loader could load the class. + * @throws ClassNotFoundException if and only if no definition for the class with the specified name could be found. + */ + public static Class loadClass(String className) throws ClassNotFoundException { + return ClassLoadingUtils.loadClass(ClassLoadingUtils.class,className); + } + + /** + * Loads a class using the class loader. + * 1. The class loader of the context class is being used. + * 2. The thread context class loader is being used. + * If both approaches fail, returns null. + * + * @param contextClass The name of a context class to use. + * @param className The name of the class to load. + * @return The class or null if no class loader could load the class. + * @throws ClassNotFoundException Aif and only if no definition for the class with the specified name could be found. + */ + public static Class loadClass(Class contextClass, String className) throws ClassNotFoundException { + Class clazz = null; + if (contextClass.getClassLoader() != null) { + clazz = loadClass(className, contextClass.getClassLoader()); + } + if (clazz == null && Thread.currentThread().getContextClassLoader() != null) { + clazz = loadClass(className, Thread.currentThread().getContextClassLoader()); + } + if (clazz == null) { + throw new ClassNotFoundException("Failed to load class" + className); + } + return clazz; + } + + /** + * Loads a {@link Class} from the specified {@link ClassLoader} without throwing {@ClassNotFoundException}. + * + * @param className The name of the class to load. + * @param classLoader Class loader instance where given class to be loaded. + * @return The class or null if no class loader could load the class. + */ + private static Class loadClass(String className, ClassLoader classLoader) { + Class clazz = null; + if (classLoader != null && className != null) { + try { + clazz = classLoader.loadClass(className); + } catch (ClassNotFoundException e) { + //Ignore and return null + } + } + return clazz; + } +} diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/ReflectionUtils.java b/core/src/main/java/org/apache/sdap/mudrod/utils/ReflectionUtils.java new file mode 100644 index 0000000..b902f7e --- /dev/null +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/ReflectionUtils.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.utils; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * Utility methods related to reflection + */ +public class ReflectionUtils { + + public static Class[] EMPTY_CLASS_ARRAY = new Class[0]; + public static Object[] EMPTY_OBJECT_ARRAY = new Object[0]; + + /** + * Returns the empty argument constructor of the class. + * + * @param clazz the class reference of given type T. + * @param class type variable. + * @return constructor for the given class type T. + * @throws SecurityException if the caller's class loader is not the same as the + * class loader of above class. + * @throws NoSuchMethodException default construct cannot by found for given class type. + */ + public static Constructor getConstructor(Class clazz) + throws SecurityException, NoSuchMethodException { + if(clazz == null) { + throw new IllegalArgumentException("class cannot be null"); + } + Constructor cons = clazz.getConstructor(EMPTY_CLASS_ARRAY); + cons.setAccessible(true); + return cons; + } + + /** + * Returns whether the class defines an empty argument constructor. + * + * @param clazz class reference of given type T. + * @return boolean indicating constructor for the given class type T exist. + * @throws SecurityException if the caller's class loader is not the same as the + * class loader of above class. + * @throws NoSuchMethodException default construct cannot by found for given class type. + */ + public static boolean hasConstructor(Class clazz) + throws SecurityException, NoSuchMethodException { + if(clazz == null) { + throw new IllegalArgumentException("class cannot be null"); + } + Constructor[] consts = clazz.getConstructors(); + + boolean found = false; + for(Constructor cons : consts) { + if(cons.getParameterTypes().length == 0) { + found = true; + } + } + + return found; + } + + /** + * Constructs a new instance of the class using the no-arg constructor. + * + * @param clazz the class of the object. + * @param class type variable. + * @return a new instance of the object. + * @throws SecurityException if the caller's class loader is not the same as the + * class loader of above class. + * @throws IllegalArgumentException this will not be thrown since field.get(obj) passing obj is null + * since the field is a static class level variable inside the class. + * @throws IllegalAccessException if the field is inaccessible due to java language access control. + * @throws InstantiationException could not be instantiated from the given constructor. + * @throws NoSuchMethodException default construct cannot by found for given class type. + * @throws InvocationTargetException if the underlying constructor throws an exception. + */ + public static T newInstance(Class clazz) + throws InstantiationException, IllegalAccessException, + SecurityException, NoSuchMethodException, IllegalArgumentException, + InvocationTargetException { + + Constructor cons = getConstructor(clazz); + + return cons.newInstance(EMPTY_OBJECT_ARRAY); + } + + /** + * Constructs a new instance of the class using the no-arg constructor. + * + * @param classStr the class name of the object. + * @return a new instance of the object. + * @throws SecurityException if the caller's class loader is not the same as the + * class loader of above class. + * @throws IllegalArgumentException this will not be thrown since field.get(obj) passing obj is null. + * since the field is a static class level variable inside the class. + * @throws IllegalAccessException if the field is inaccessible due to java language access control. + * @throws ClassNotFoundException class definition cannot be found for the class type. + * @throws InstantiationException could not be instantiated from the given constructor. + * @throws NoSuchMethodException default construct cannot by found for given class type. + * @throws InvocationTargetException if the underlying constructor throws an exception. + */ + public static Object newInstance(String classStr) + throws InstantiationException, IllegalAccessException, + ClassNotFoundException, SecurityException, IllegalArgumentException, + NoSuchMethodException, InvocationTargetException { + if(classStr == null) { + throw new IllegalArgumentException("class cannot be null"); + } + Class clazz = ClassLoadingUtils.loadClass(classStr); + return newInstance(clazz); + } + + /** + * Returns the value of a named static field. + * + * @param clazz the class of the object. + * @param fieldName field name of the instance which value is required. + * @return a new instance of the object. + * @throws SecurityException if the caller's class loader is not the same as the + * class loader of above class. + * @throws NoSuchFieldException if a field with the specified name is not found. + * @throws IllegalArgumentException this will not be thrown since field.get(obj) passing obj is null + * since the field is a static class level variable inside the class. + * @throws IllegalAccessException if the field is inaccessible due to java language access control. + */ + public static Object getStaticField(Class clazz, String fieldName) + throws IllegalArgumentException, SecurityException, + IllegalAccessException, NoSuchFieldException { + + return clazz.getField(fieldName).get(null); + } + +} diff --git a/pom.xml b/pom.xml index 6d1414d..e371fe4 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 10 - org.apache.sdap.mudrod + org.apache.sdap mudrod-parent 0.0.1-SNAPSHOT pom @@ -135,8 +135,10 @@ + analysis core service + storage web @@ -416,6 +418,14 @@ + + + org.apache.sdap + mudrod-core + ${project.version} + + + org.elasticsearch diff --git a/service/pom.xml b/service/pom.xml index 8dd59a9..3d98af5 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -18,7 +18,7 @@ 4.0.0 - org.apache.sdap.mudrod + org.apache.sdap mudrod-parent 0.0.1-SNAPSHOT ../ @@ -40,14 +40,14 @@ - org.apache.sdap.mudrod + org.apache.sdap mudrod-core ${project.version} - org.apache.sdap.mudrod + org.apache.sdap mudrod-web ${project.version} diff --git a/storage/pom.xml b/storage/pom.xml new file mode 100644 index 0000000..448315b --- /dev/null +++ b/storage/pom.xml @@ -0,0 +1,142 @@ + + + + 4.0.0 + + + org.apache.sdap + mudrod-parent + 0.0.1-SNAPSHOT + ../ + + + mudrod-storage + + Mudrod :: Storage + Mudrod storage logic. + + + + org.apache.sdap + mudrod-core + + + + com.google.code.gson + gson + + + + org.jdom + jdom + + + + + org.elasticsearch + elasticsearch + + + org.elasticsearch.client + transport + + + org.elasticsearch.plugin + transport-netty4-client + + + + + org.elasticsearch + elasticsearch-spark-20_2.11 + + + joda-time + joda-time + + + com.carrotsearch + hppc + + + + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + + + junit + junit + + + + + + + + + ${basedir}/../ + META-INF + + LICENSE.txt + NOTICE.txt + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + + + release + + + + ${basedir}/../ + + ${project.build.directory}/apidocs/META-INF + + + LICENSE.txt + NOTICE.txt + + + + + + + + diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriver.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriver.java new file mode 100644 index 0000000..9a9d7cb --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriver.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.sdap.mudrod.storage.elasticsearch.ElasticSearchDriver; +import org.apache.sdap.mudrod.storage.solr.SolrDriver; + +/** + * Core storage datum from which all concrete storage-related + * implementations should extend. + * @since v0.0.1-SNAPSHOT + */ +public interface StorageDriver { + + /** + * + * @return an initialized {@link org.apache.sdap.mudrod.storage.StorageDriver} implementation. + */ + default StorageDriver initialize(Properties props) { + StorageDriver sDriver = null; + if (props != null) { + switch (props.getProperty("mudrod.storage.driver", "elasticsearch")) { + case "elasticsearch": + sDriver = new ElasticSearchDriver(props); + break; + default: + sDriver = new SolrDriver(props); + break; + } + } else { + + } + return sDriver; + } + + abstract void createBulkProcessor(); + + abstract void destroyBulkProcessor(); + + abstract void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException; + + abstract String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException; + + abstract String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException; + + abstract List customAnalyzing(String indexName, List list) throws InterruptedException, ExecutionException; + + //abstract void deleteAllByQuery(String index, String type, QueryBuilder query); + + abstract void deleteType(String index, String type); + + abstract List getTypeListWithPrefix(Object object, Object object2); + + abstract List getIndexListWithPrefix(Object object); + + abstract String searchByQuery(String index, String type, String query); + + abstract String searchByQuery(String index, String type, String query, Boolean bDetail); + + abstract List> buildMeasurementHierarchies( + List topics, List terms, List variables, + List variableDetails); + + abstract List autoComplete(String index, String term); + + abstract void close(); + + abstract void refreshIndex(); + + //abstract Client makeClient(Properties props); + + //abstract UpdateRequest generateUpdateRequest(String index, + // String type, String id, String field1, Object value1); + + //UpdateRequest generateUpdateRequest(String index, String type, String id, + // Map filedValueMap); + + abstract int getDocCount(String index, String... type); + + abstract int getDocCount(String[] index, String[] type); + + //public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch); + +} diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java new file mode 100644 index 0000000..ef3c80b --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.sdap.mudrod.utils.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class StorageDriverFactory { + + public static final Logger log = LoggerFactory.getLogger(StorageDriverFactory.class); + + public static final String MUDROD_DEFAULT_PROPERTIES_FILE = "config.properties"; + + public static final String MUDROD_DEFAULT_DATASTORE_KEY = "mudrod.datastore.default"; + + private StorageDriverFactory() { } + + /** + * Creates a new {@link Properties}. It adds the default gora configuration + * resources. This properties object can be modified and used to instantiate + * store instances. It is recommended to use a properties object for a single + * store, because the properties object is passed on to store initialization + * methods that are able to store the properties as a field. + * @return The new properties object. + */ + public static Properties createProps() { + try { + Properties properties = new Properties(); + InputStream stream = StorageDriverFactory.class.getClassLoader() + .getResourceAsStream(MUDROD_DEFAULT_PROPERTIES_FILE); + if(stream != null) { + try { + properties.load(stream); + return properties; + } finally { + stream.close(); + } + } else { + log.warn(MUDROD_DEFAULT_PROPERTIES_FILE + " not found, properties will be empty."); + } + return properties; + } catch(Exception e) { + throw new RuntimeException(e); + } + } + + private static void initializeStorageDriver( + StorageDriver storageDriver, Properties properties) throws IOException { + storageDriver.initialize(properties); + } + + /** + * Instantiate a new {@link DataStore}. + * + * @param storageDriverClass The datastore implementation class. + * @param properties The properties to be used in the store. + * @return A new {@link org.apache.sdap.mudrod.storage.StorageDriver} instance. + * @throws Exception + */ + public static StorageDriver createDataStore(Class storageDriverClass, Properties properties) throws Exception{ + try { + StorageDriver storageDriver = + (StorageDriver) ReflectionUtils.newInstance(storageDriverClass); + initializeStorageDriver(storageDriver, properties); + return storageDriver; + } catch(Exception ex) { + throw new Exception(ex); + } + } +} diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/ElasticSearchDriver.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/ElasticSearchDriver.java new file mode 100644 index 0000000..886070d --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/ElasticSearchDriver.java @@ -0,0 +1,636 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage.elasticsearch; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import java.io.IOException; +import java.net.InetAddress; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; +import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.storage.StorageDriver; +import org.apache.sdap.mudrod.utils.ESTransportClient; +import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse; +import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.Fuzziness; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.SuggestBuilder; +import org.elasticsearch.search.suggest.SuggestBuilders; +import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import com.google.gson.GsonBuilder; + +/** + * + */ +public class ElasticSearchDriver implements StorageDriver { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchDriver.class); + private transient Client client = null; + private transient Node node = null; + private transient BulkProcessor bulkProcessor = null; + + /** + * @param props + * + */ + public ElasticSearchDriver(Properties props) { + try { + setClient(makeClient(props)); + } catch (IOException e) { + LOG.error("Error whilst constructing Elastcisearch client.", e); + } + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#createBulkProcessor() + */ + @Override + public void createBulkProcessor() { + LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500); + setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!"); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!"); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + LOG.error("Bulk request has failed!"); + throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure); + } + }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1) + .build()); + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#destroyBulkProcessor() + */ + @Override + public void destroyBulkProcessor() { + try { + getBulkProcessor().awaitClose(20, TimeUnit.MINUTES); + setBulkProcessor(null); + refreshIndex(); + } catch (InterruptedException e) { + LOG.error("Error destroying the Bulk Processor.", e); + } + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#putMapping(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException { + boolean exists = getClient() + .admin() + .indices() + .prepareExists(indexName) + .execute() + .actionGet() + .isExists(); + if (exists) { + return; + } + + getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet(); + getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet(); + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#customAnalyzing(java.lang.String, java.lang.String) + */ + @Override + public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException { + return this.customAnalyzing(indexName, "cody", str); + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#customAnalyzing(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException { + String[] strList = str.toLowerCase().split(","); + for (int i = 0; i < strList.length; i++) { + String tmp = ""; + AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get(); + for (AnalyzeToken token : r.getTokens()) { + tmp += token.getTerm() + " "; + } + strList[i] = tmp.trim(); + } + return String.join(",", strList); + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#customAnalyzing(java.lang.String, java.util.List) + */ + @Override + public List customAnalyzing(String indexName, List list) throws InterruptedException, ExecutionException { + if (list == null) { + return list; + } + List customlist = new ArrayList<>(); + for (String aList : list) { + customlist.add(this.customAnalyzing(indexName, aList)); + } + + return customlist; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#deleteType(java.lang.String, java.lang.String) + */ + @Override + public void deleteType(String index, String type) { + this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery()); + + } + + public void deleteAllByQuery(String index, String type, QueryBuilder query) { + ImmutableOpenMap mappings = getClient().admin() + .cluster().prepareState().execute().actionGet() + .getState().metaData().index(index).getMappings(); + + //check if the type exists + if (!mappings.containsKey(type)) + return; + + createBulkProcessor(); + SearchResponse scrollResp = getClient().prepareSearch(index).setSearchType( + SearchType.QUERY_AND_FETCH).setTypes(type).setScroll( + new TimeValue(60000)).setQuery(query).setSize(10000).execute().actionGet(); + + while (true) { + for (SearchHit hit : scrollResp.getHits().getHits()) { + DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId()); + getBulkProcessor().add(deleteRequest); + } + + scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(new TimeValue(600000)).execute().actionGet(); + if (scrollResp.getHits().getHits().length == 0) { + break; + } + + } + destroyBulkProcessor(); + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getTypeListWithPrefix(java.lang.Object, java.lang.Object) + */ + @Override + public List getTypeListWithPrefix(Object object, Object object2) { + ArrayList typeList = new ArrayList<>(); + GetMappingsResponse res; + try { + res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get(); + ImmutableOpenMap mapping = res.mappings().get(object.toString()); + for (ObjectObjectCursor c : mapping) { + if (c.key.startsWith(object2.toString())) { + typeList.add(c.key); + } + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e); + } + return typeList; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getIndexListWithPrefix(java.lang.Object) + */ + @Override + public List getIndexListWithPrefix(Object object) { + LOG.info("Retrieving index list with prefix: {}", object.toString()); + String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices(); + + ArrayList indexList = new ArrayList<>(); + for (String indexName : indices) { + if (indexName.startsWith(object.toString())) { + indexList.add(indexName); + } + } + + return indexList; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#searchByQuery(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public String searchByQuery(String index, String type, String query) { + return searchByQuery(index, type, query, false); + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#searchByQuery(java.lang.String, java.lang.String, java.lang.String, java.lang.Boolean) + */ + @Override + public String searchByQuery(String index, String type, String query, Boolean bDetail) { + boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); + if (!exists) { + return null; + } + + QueryBuilder qb = QueryBuilders.queryStringQuery(query); + SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet(); + + // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same. + Map fieldsToReturn = new HashMap<>(); + + fieldsToReturn.put("Dataset-ShortName", "Short Name"); + fieldsToReturn.put("Dataset-LongName", "Long Name"); + fieldsToReturn.put("DatasetParameter-Topic", "Topic"); + fieldsToReturn.put("Dataset-Description", "Dataset-Description"); + fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date"); + + if (bDetail) { + fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat"); + fieldsToReturn.put("Dataset-Doi", "Dataset-Doi"); + fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level"); + fieldsToReturn.put("DatasetCitation-Version", "Version"); + fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName"); + fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName"); + fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category"); + fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath"); + fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full"); + fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full"); + fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail"); + + fieldsToReturn.put("DatasetRegion-Region", "Region"); + fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat"); + fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat"); + fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon"); + fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon"); + fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long"); + fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong"); + + fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution"); + fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat"); + fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution"); + fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution"); + fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution"); + fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution"); + } + + List> searchResults = new ArrayList<>(); + + for (SearchHit hit : response.getHits().getHits()) { + Map source = hit.getSource(); + + Map searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey())) + .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue)); + + // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result + + // Some results require special handling/formatting: + // Release Date formatting + LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate(); + searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE)); + + if (bDetail) { + + // DataFormat value, translate RAW to BINARY + if ("RAW".equals(searchResult.get("DataFormat"))) { + searchResult.put("DataFormat", "BINARY"); + } + + // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs + List urls = ((List) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList()); + searchResult.put("DatasetLocationPolicy-BasePath", urls); + + // Time Span Formatting + LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate(); + LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ? + null : + Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate(); + searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE))); + + // Temporal resolution can come from one of two fields + searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution")); + + // Special formatting for spatial resolution + String latResolution = (String) searchResult.get("Dataset-LatitudeResolution"); + String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution"); + if (!latResolution.isEmpty() && !lonResolution.isEmpty()) { + searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)"); + } else { + String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution"); + String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution"); + double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000; + double dAlonResolution = Double.parseDouble(alonResolution) / 1000; + searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)"); + } + + // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies. + List> measurements = buildMeasurementHierarchies((List) searchResult.get("Topic"), (List) searchResult.get("DatasetParameter-Term-Full"), + (List) searchResult.get("DatasetParameter-Variable-Full"), (List) searchResult.get("DatasetParameter-VariableDetail")); + + searchResult.put("Measurements", measurements); + + } + + searchResults.add(searchResult); + } + + Map> pdResults = new HashMap<>(); + pdResults.put("PDResults", searchResults); + + return new GsonBuilder().create().toJson(pdResults); + } + + /** + * Builds a List of Measurement Hierarchies given the individual source lists. + * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail + * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete. + * + * For example, if the input is: + *
+   * topics = ["Oceans", "Oceans"]
+   * terms = ["Sea Surface Topography", "Ocean Waves"]
+   * variables = ["Sea Surface Height", "Significant Wave Height"]
+   * variableDetails = ["None", "None"]
+   * 
+ * + * The output would be: + *
+   *   [
+   *     ["Oceans", "Sea Surface Topography", "Sea Surface Height"],
+   *     ["Oceans", "Ocean Waves", "Significant Wave Height"]
+   *   ]
+   * 
+ * Oceans > Sea Surface Topography > Sea Surface Height + * Oceans > Ocean Waves > Significant Wave Height + * + * @param topics List of topics, the first element of a measurement + * @param terms List of terms, the second element of a measurement + * @param variables List of variables, the third element of a measurement + * @param variableDetails List of variable details, the fourth element of a measurement + * + * @return A List where each element is a single hierarchy (as a List) built from the provided input lists. + */ + @Override + public List> buildMeasurementHierarchies(List topics, List terms, List variables, List variableDetails) { + List> measurements = new ArrayList<>(); + + for (int x = 0; x < topics.size(); x++) { + measurements.add(new ArrayList<>()); + measurements.get(x).add(topics.get(x)); + // Only add the next 'level' if we can + if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) { + measurements.get(x).add(terms.get(x)); + if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) { + measurements.get(x).add(variables.get(x)); + if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) { + measurements.get(x).add(variableDetails.get(x)); + } + } + } + } + + return measurements; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#autoComplete(java.lang.String, java.lang.String) + */ + @Override + public List autoComplete(String index, String term) { + boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); + if (!exists) { + return new ArrayList<>(); + } + + Set suggestHS = new HashSet<>(); + List suggestList = new ArrayList<>(); + + // please make sure that the completion field is configured in the ES mapping + CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100); + SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder)); + SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet(); + + Iterator iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator(); + + while (iterator.hasNext()) { + Suggest.Suggestion.Entry.Option next = iterator.next(); + String suggest = next.getText().string().toLowerCase(); + suggestList.add(suggest); + } + + suggestHS.addAll(suggestList); + suggestList.clear(); + suggestList.addAll(suggestHS); + return suggestList; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#close() + */ + @Override + public void close() { + client.close(); + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#refreshIndex() + */ + @Override + public void refreshIndex() { + client.admin().indices().prepareRefresh().execute().actionGet(); + } + + /** + * Generates a TransportClient or NodeClient + * + * @param props a populated {@link java.util.Properties} object + * @return a constructed {@link org.elasticsearch.client.Client} + * @throws IOException if there is an error building the + * {@link org.elasticsearch.client.Client} + */ + protected Client makeClient(Properties props) throws IOException { + String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER); + String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS); + String[] hosts = hostsString.split(","); + String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT); + int port = Integer.parseInt(portStr); + + Settings.Builder settingsBuilder = Settings.builder(); + + // Set the cluster name and build the settings + if (!clusterName.isEmpty()) + settingsBuilder.put("cluster.name", clusterName); + + settingsBuilder.put("http.type", "netty3"); + settingsBuilder.put("transport.type", "netty3"); + + Settings settings = settingsBuilder.build(); + + Client client = null; + + // Prefer TransportClient + if (hosts != null && port > 1) { + TransportClient transportClient = new ESTransportClient(settings); + for (String host : hosts) + transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); + client = transportClient; + } else if (clusterName != null) { + node = new Node(settings); + client = node.client(); + } + + return client; + } + + /** + * @return the client + */ + public Client getClient() { + return client; + } + + /** + * @param client the client to set + */ + public void setClient(Client client) { + this.client = client; + } + + /** + * @return the bulkProcessor + */ + public BulkProcessor getBulkProcessor() { + return bulkProcessor; + } + + /** + * @param bulkProcessor the bulkProcessor to set + */ + public void setBulkProcessor(BulkProcessor bulkProcessor) { + this.bulkProcessor = bulkProcessor; + } + + public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) { + + UpdateRequest ur = null; + try { + ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); + } catch (IOException e) { + LOG.error("Error whilst attempting to generate a new Update Request.", e); + } + + return ur; + } + + public UpdateRequest generateUpdateRequest(String index, String type, String id, Map filedValueMap) { + + UpdateRequest ur = null; + try { + XContentBuilder builder = jsonBuilder().startObject(); + for (Entry entry : filedValueMap.entrySet()) { + String key = entry.getKey(); + builder.field(key, filedValueMap.get(key)); + } + builder.endObject(); + ur = new UpdateRequest(index, type, id).doc(builder); + } catch (IOException e) { + LOG.error("Error whilst attempting to generate a new Update Request.", e); + } + + return ur; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getDocCount(java.lang.String, java.lang.String[]) + */ + @Override + public int getDocCount(String index, String... type) { + MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); + String[] indexArr = new String[] { index }; + return this.getDocCount(indexArr, type, search); + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getDocCount(java.lang.String[], java.lang.String[]) + */ + @Override + public int getDocCount(String[] index, String[] type) { + MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); + return this.getDocCount(index, type, search); + } + + public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) { + SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0); + SearchResponse countSr = countSrBuilder.execute().actionGet(); + int docCount = (int) countSr.getHits().getTotalHits(); + return docCount; + } + +} diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/package-info.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/package-info.java new file mode 100644 index 0000000..107ff51 --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/elasticsearch/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage.elasticsearch; \ No newline at end of file diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/package-info.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/package-info.java new file mode 100644 index 0000000..af07a01 --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage; \ No newline at end of file diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/solr/SolrDriver.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/solr/SolrDriver.java new file mode 100644 index 0000000..58fac37 --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/solr/SolrDriver.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage.solr; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.sdap.mudrod.storage.StorageDriver; + +/** + * + */ +public class SolrDriver implements StorageDriver { + + /** + * @param props + * + */ + public SolrDriver(Properties props) { + // TODO Auto-generated constructor stub + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#createBulkProcessor() + */ + @Override + public void createBulkProcessor() { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#destroyBulkProcessor() + */ + @Override + public void destroyBulkProcessor() { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#putMapping(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#customAnalyzing(java.lang.String, java.lang.String) + */ + @Override + public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#customAnalyzing(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#customAnalyzing(java.lang.String, java.util.List) + */ + @Override + public List customAnalyzing(String indexName, List list) throws InterruptedException, ExecutionException { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#deleteType(java.lang.String, java.lang.String) + */ + @Override + public void deleteType(String index, String type) { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getTypeListWithPrefix(java.lang.Object, java.lang.Object) + */ + @Override + public List getTypeListWithPrefix(Object object, Object object2) { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getIndexListWithPrefix(java.lang.Object) + */ + @Override + public List getIndexListWithPrefix(Object object) { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#searchByQuery(java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public String searchByQuery(String index, String type, String query) { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#searchByQuery(java.lang.String, java.lang.String, java.lang.String, java.lang.Boolean) + */ + @Override + public String searchByQuery(String index, String type, String query, Boolean bDetail) { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#buildMeasurementHierarchies(java.util.List, java.util.List, java.util.List, java.util.List) + */ + @Override + public List> buildMeasurementHierarchies(List topics, List terms, List variables, List variableDetails) { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#autoComplete(java.lang.String, java.lang.String) + */ + @Override + public List autoComplete(String index, String term) { + // TODO Auto-generated method stub + return null; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#close() + */ + @Override + public void close() { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#refreshIndex() + */ + @Override + public void refreshIndex() { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getDocCount(java.lang.String, java.lang.String[]) + */ + @Override + public int getDocCount(String index, String... type) { + // TODO Auto-generated method stub + return 0; + } + + /* (non-Javadoc) + * @see org.apache.sdap.mudrod.storage.StorageDriver#getDocCount(java.lang.String[], java.lang.String[]) + */ + @Override + public int getDocCount(String[] index, String[] type) { + // TODO Auto-generated method stub + return 0; + } + +} diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/solr/package-info.java b/storage/src/main/java/org/apache/sdap/mudrod/storage/solr/package-info.java new file mode 100644 index 0000000..1c48a7e --- /dev/null +++ b/storage/src/main/java/org/apache/sdap/mudrod/storage/solr/package-info.java @@ -0,0 +1,14 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sdap.mudrod.storage.solr; \ No newline at end of file diff --git a/web/pom.xml b/web/pom.xml index 3b637b0..e6f58da 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -17,7 +17,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.apache.sdap.mudrod + org.apache.sdap mudrod-web 0.0.1-SNAPSHOT From 5b38ed61a006a0390244e9481f40f848b5aa9c4a Mon Sep 17 00:00:00 2001 From: jjacob Date: Wed, 6 Jun 2018 11:35:23 -0700 Subject: [PATCH 2/2] Progress on SDAP-13 --- .../MetadataDiscoveryEngine.java | 2 +- .../WeblogDiscoveryEngine.java | 4 +- .../apache/sdap/mudrod/driver/ESDriver.java | 592 ------------------ .../sdap/mudrod/driver}/StorageDriver.java | 0 .../mudrod/driver}/StorageDriverFactory.java | 2 +- .../sdap/mudrod/main/MudrodConstants.java | 2 + .../apache/sdap/mudrod/main/MudrodEngine.java | 61 +- .../sdap/mudrod/utils/ClassLoadingUtils.java | 2 +- core/src/main/resources/config.properties | 5 +- pom.xml | 1 - web/pom.xml | 1 - 11 files changed, 45 insertions(+), 627 deletions(-) delete mode 100644 core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java rename {storage/src/main/java/org/apache/sdap/mudrod/storage => core/src/main/java/org/apache/sdap/mudrod/driver}/StorageDriver.java (100%) rename {storage/src/main/java/org/apache/sdap/mudrod/storage => core/src/main/java/org/apache/sdap/mudrod/driver}/StorageDriverFactory.java (98%) diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java index a069f96..5a96133 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/MetadataDiscoveryEngine.java @@ -35,7 +35,7 @@ public class MetadataDiscoveryEngine extends DiscoveryEngineAbstract implements private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MetadataDiscoveryEngine.class); - public MetadataDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) { + public MetadataDiscoveryEngine(Properties props, StorageDriver es, SparkDriver spark) { super(props, es, spark); } diff --git a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java index b2cdb9f..d20e57e 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java +++ b/core/src/main/java/org/apache/sdap/mudrod/discoveryengine/WeblogDiscoveryEngine.java @@ -17,9 +17,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.sdap.mudrod.driver.ESDriver; import org.apache.sdap.mudrod.driver.SparkDriver; import org.apache.sdap.mudrod.main.MudrodConstants; +import org.apache.sdap.mudrod.storage.StorageDriver; import org.apache.sdap.mudrod.weblog.pre.*; import org.apache.sdap.mudrod.weblog.process.ClickStreamAnalyzer; import org.apache.sdap.mudrod.weblog.process.UserHistoryAnalyzer; @@ -46,7 +46,7 @@ public class WeblogDiscoveryEngine extends DiscoveryEngineAbstract { private static final Logger LOG = LoggerFactory.getLogger(WeblogDiscoveryEngine.class); public String timeSuffix = null; - public WeblogDiscoveryEngine(Properties props, ESDriver es, SparkDriver spark) { + public WeblogDiscoveryEngine(Properties props, StorageDriver sd, SparkDriver spark) { super(props, es, spark); LOG.info("Started Mudrod Weblog Discovery Engine."); } diff --git a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java b/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java deleted file mode 100644 index fc51557..0000000 --- a/core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java +++ /dev/null @@ -1,592 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sdap.mudrod.driver; - -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import com.google.gson.GsonBuilder; - -import org.apache.commons.lang.StringUtils; -import org.apache.sdap.mudrod.main.MudrodConstants; -import org.apache.sdap.mudrod.main.MudrodEngine; -import org.apache.sdap.mudrod.utils.ESTransportClient; -import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse; -import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse.AnalyzeToken; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.Fuzziness; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.node.Node; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.suggest.Suggest; -import org.elasticsearch.search.suggest.SuggestBuilder; -import org.elasticsearch.search.suggest.SuggestBuilders; -import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.net.InetAddress; -import java.time.Instant; -import java.time.LocalDate; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -/** - * Driver implementation for all Elasticsearch functionality. - */ -public class ESDriver implements Serializable { - - private static final Logger LOG = LoggerFactory.getLogger(ESDriver.class); - private static final long serialVersionUID = 1L; - private transient Client client = null; - private transient Node node = null; - private transient BulkProcessor bulkProcessor = null; - - /** - * Default constructor for this class. To load client configuration call - * substantiated constructor. - */ - public ESDriver() { - // Default constructor, to load configuration call ESDriver(props) - } - - /** - * Substantiated constructor which accepts a {@link java.util.Properties} - * - * @param props a populated properties object. - */ - public ESDriver(Properties props) { - try { - setClient(makeClient(props)); - } catch (IOException e) { - LOG.error("Error whilst constructing Elastcisearch client.", e); - } - } - - public void createBulkProcessor() { - LOG.debug("Creating BulkProcessor with maxBulkDocs={}, maxBulkLength={}", 1000, 2500500); - setBulkProcessor(BulkProcessor.builder(getClient(), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - LOG.debug("ESDriver#createBulkProcessor @Override #beforeBulk is not implemented yet!"); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - LOG.debug("ESDriver#createBulkProcessor @Override #afterBulk is not implemented yet!"); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Bulk request has failed!"); - throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure); - } - }).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1) - .build()); - } - - public void destroyBulkProcessor() { - try { - getBulkProcessor().awaitClose(10, TimeUnit.MINUTES); - setBulkProcessor(null); - refreshIndex(); - } catch (InterruptedException e) { - LOG.error("Error destroying the Bulk Processor.", e); - } - } - - public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException { - - boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists(); - if (exists) { - return; - } - - getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet(); - getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet(); - } - - public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException { - return this.customAnalyzing(indexName, "cody", str); - } - - public String customAnalyzing(String indexName, String analyzer, String str) throws InterruptedException, ExecutionException { - String[] strList = str.toLowerCase().split(","); - for (int i = 0; i < strList.length; i++) { - String tmp = ""; - AnalyzeResponse r = client.admin().indices().prepareAnalyze(strList[i]).setIndex(indexName).setAnalyzer(analyzer).execute().get(); - for (AnalyzeToken token : r.getTokens()) { - tmp += token.getTerm() + " "; - } - strList[i] = tmp.trim(); - } - return String.join(",", strList); - } - - public List customAnalyzing(String indexName, List list) throws InterruptedException, ExecutionException { - if (list == null) { - return list; - } - List customlist = new ArrayList<>(); - for (String aList : list) { - customlist.add(this.customAnalyzing(indexName, aList)); - } - - return customlist; - } - - public void deleteAllByQuery(String index, String type, QueryBuilder query) { - ImmutableOpenMap mappings = getClient() - .admin() - .cluster() - .prepareState() - .execute() - .actionGet() - .getState() - .metaData() - .index(index) - .getMappings(); - - //check if the type exists - if (!mappings.containsKey(type)) - return; - - createBulkProcessor(); - SearchResponse scrollResp = getClient() - .prepareSearch(index) - .setSearchType(SearchType.QUERY_AND_FETCH) - .setTypes(type) - .setScroll(new TimeValue(60000)) - .setQuery(query) - .setSize(10000) - .execute() - .actionGet(); - - while (true) { - for (SearchHit hit : scrollResp.getHits().getHits()) { - DeleteRequest deleteRequest = new DeleteRequest(index, type, hit.getId()); - getBulkProcessor().add(deleteRequest); - } - - scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); - if (scrollResp.getHits().getHits().length == 0) { - break; - } - - } - destroyBulkProcessor(); - } - - public void deleteType(String index, String type) { - this.deleteAllByQuery(index, type, QueryBuilders.matchAllQuery()); - } - - public List getTypeListWithPrefix(Object object, Object object2) { - ArrayList typeList = new ArrayList<>(); - GetMappingsResponse res; - try { - res = getClient().admin().indices().getMappings(new GetMappingsRequest().indices(object.toString())).get(); - ImmutableOpenMap mapping = res.mappings().get(object.toString()); - for (ObjectObjectCursor c : mapping) { - if (c.key.startsWith(object2.toString())) { - typeList.add(c.key); - } - } - } catch (InterruptedException | ExecutionException e) { - LOG.error("Error whilst obtaining type list from Elasticsearch mappings.", e); - } - return typeList; - } - - public List getIndexListWithPrefix(Object object) { - - LOG.info("Retrieving index list with prefix: {}", object.toString()); - String[] indices = client.admin().indices().getIndex(new GetIndexRequest()).actionGet().getIndices(); - - ArrayList indexList = new ArrayList<>(); - for (String indexName : indices) { - if (indexName.startsWith(object.toString())) { - indexList.add(indexName); - } - } - - return indexList; - } - - public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException { - return searchByQuery(index, type, query, false); - } - - @SuppressWarnings("unchecked") - public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException { - boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); - if (!exists) { - return null; - } - - QueryBuilder qb = QueryBuilders.queryStringQuery(query); - SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet(); - - // Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same. - Map fieldsToReturn = new HashMap<>(); - - fieldsToReturn.put("Dataset-ShortName", "Short Name"); - fieldsToReturn.put("Dataset-LongName", "Long Name"); - fieldsToReturn.put("DatasetParameter-Topic", "Topic"); - fieldsToReturn.put("Dataset-Description", "Dataset-Description"); - fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date"); - - if (bDetail) { - fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat"); - fieldsToReturn.put("Dataset-Doi", "Dataset-Doi"); - fieldsToReturn.put("Dataset-ProcessingLevel", "Processing Level"); - fieldsToReturn.put("DatasetCitation-Version", "Version"); - fieldsToReturn.put("DatasetSource-Sensor-ShortName", "DatasetSource-Sensor-ShortName"); - fieldsToReturn.put("DatasetProject-Project-ShortName", "DatasetProject-Project-ShortName"); - fieldsToReturn.put("DatasetParameter-Category", "DatasetParameter-Category"); - fieldsToReturn.put("DatasetLocationPolicy-BasePath", "DatasetLocationPolicy-BasePath"); - fieldsToReturn.put("DatasetParameter-Variable-Full", "DatasetParameter-Variable-Full"); - fieldsToReturn.put("DatasetParameter-Term-Full", "DatasetParameter-Term-Full"); - fieldsToReturn.put("DatasetParameter-VariableDetail", "DatasetParameter-VariableDetail"); - - fieldsToReturn.put("DatasetRegion-Region", "Region"); - fieldsToReturn.put("DatasetCoverage-NorthLat", "NorthLat"); - fieldsToReturn.put("DatasetCoverage-SouthLat", "SouthLat"); - fieldsToReturn.put("DatasetCoverage-WestLon", "WestLon"); - fieldsToReturn.put("DatasetCoverage-EastLon", "EastLon"); - fieldsToReturn.put("DatasetCoverage-StartTimeLong-Long", "DatasetCoverage-StartTimeLong-Long"); - fieldsToReturn.put("Dataset-DatasetCoverage-StopTimeLong", "Dataset-DatasetCoverage-StopTimeLong"); - - fieldsToReturn.put("Dataset-TemporalResolution", "Dataset-TemporalResolution"); - fieldsToReturn.put("Dataset-TemporalRepeat", "Dataset-TemporalRepeat"); - fieldsToReturn.put("Dataset-LatitudeResolution", "Dataset-LatitudeResolution"); - fieldsToReturn.put("Dataset-LongitudeResolution", "Dataset-LongitudeResolution"); - fieldsToReturn.put("Dataset-AcrossTrackResolution", "Dataset-AcrossTrackResolution"); - fieldsToReturn.put("Dataset-AlongTrackResolution", "Dataset-AlongTrackResolution"); - } - - List> searchResults = new ArrayList<>(); - - for (SearchHit hit : response.getHits().getHits()) { - Map source = hit.getSource(); - - Map searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey())) - .collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue)); - - // searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result - - // Some results require special handling/formatting: - // Release Date formatting - LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate(); - searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE)); - - if (bDetail) { - - // DataFormat value, translate RAW to BINARY - if ("RAW".equals(searchResult.get("DataFormat"))) { - searchResult.put("DataFormat", "BINARY"); - } - - // DatasetLocationPolicy-BasePath Should only contain ftp, http, or https URLs - List urls = ((List) searchResult.get("DatasetLocationPolicy-BasePath")).stream().filter(url -> url.startsWith("ftp") || url.startsWith("http")).collect(Collectors.toList()); - searchResult.put("DatasetLocationPolicy-BasePath", urls); - - // Time Span Formatting - LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate(); - LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ? - null : - Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate(); - searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE))); - - // Temporal resolution can come from one of two fields - searchResult.put("TemporalResolution", "".equals(searchResult.get("Dataset-TemporalResolution")) ? searchResult.get("Dataset-TemporalRepeat") : searchResult.get("Dataset-TemporalResolution")); - - // Special formatting for spatial resolution - String latResolution = (String) searchResult.get("Dataset-LatitudeResolution"); - String lonResolution = (String) searchResult.get("Dataset-LongitudeResolution"); - if (!latResolution.isEmpty() && !lonResolution.isEmpty()) { - searchResult.put("SpatialResolution", latResolution + " degrees (latitude) x " + lonResolution + " degrees (longitude)"); - } else { - String acrossResolution = (String) searchResult.get("Dataset-AcrossTrackResolution"); - String alonResolution = (String) searchResult.get("Dataset-AlongTrackResolution"); - double dAcrossResolution = Double.parseDouble(acrossResolution) / 1000; - double dAlonResolution = Double.parseDouble(alonResolution) / 1000; - searchResult.put("SpatialResolution", dAlonResolution + " km (Along) x " + dAcrossResolution + " km (Across)"); - } - - // Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies. - List> measurements = buildMeasurementHierarchies((List) searchResult.get("Topic"), (List) searchResult.get("DatasetParameter-Term-Full"), - (List) searchResult.get("DatasetParameter-Variable-Full"), (List) searchResult.get("DatasetParameter-VariableDetail")); - - searchResult.put("Measurements", measurements); - - } - - searchResults.add(searchResult); - } - - Map> pdResults = new HashMap<>(); - pdResults.put("PDResults", searchResults); - - return new GsonBuilder().create().toJson(pdResults); - } - - /** - * Builds a List of Measurement Hierarchies given the individual source lists. - * The hierarchy is built from the element in the same position from each input list in the order: Topic -> Term -> Variable -> VariableDetail - * "None" and blank strings are ignored. If, at any level, an element does not exist for that position or it is "None" or blank, that hierarchy is considered complete. - * - * For example, if the input is: - *
-   * topics = ["Oceans", "Oceans"]
-   * terms = ["Sea Surface Topography", "Ocean Waves"]
-   * variables = ["Sea Surface Height", "Significant Wave Height"]
-   * variableDetails = ["None", "None"]
-   * 
- * - * The output would be: - *
-   *   [
-   *     ["Oceans", "Sea Surface Topography", "Sea Surface Height"],
-   *     ["Oceans", "Ocean Waves", "Significant Wave Height"]
-   *   ]
-   * 
- * Oceans > Sea Surface Topography > Sea Surface Height - * Oceans > Ocean Waves > Significant Wave Height - * - * @param topics List of topics, the first element of a measurement - * @param terms List of terms, the second element of a measurement - * @param variables List of variables, the third element of a measurement - * @param variableDetails List of variable details, the fourth element of a measurement - * - * @return A List where each element is a single hierarchy (as a List) built from the provided input lists. - */ - private List> buildMeasurementHierarchies(List topics, List terms, List variables, List variableDetails) { - - List> measurements = new ArrayList<>(); - - for (int x = 0; x < topics.size(); x++) { - measurements.add(new ArrayList<>()); - measurements.get(x).add(topics.get(x)); - // Only add the next 'level' if we can - if (x < terms.size() && !"None".equalsIgnoreCase(terms.get(x)) && StringUtils.isNotBlank(terms.get(x))) { - measurements.get(x).add(terms.get(x)); - if (x < variables.size() && !"None".equalsIgnoreCase(variables.get(x)) && StringUtils.isNotBlank(variables.get(x))) { - measurements.get(x).add(variables.get(x)); - if (x < variableDetails.size() && !"None".equalsIgnoreCase(variableDetails.get(x)) && StringUtils.isNotBlank(variableDetails.get(x))) { - measurements.get(x).add(variableDetails.get(x)); - } - } - } - } - - return measurements; - - } - - public List autoComplete(String index, String term) { - boolean exists = this.getClient().admin().indices().prepareExists(index).execute().actionGet().isExists(); - if (!exists) { - return new ArrayList<>(); - } - - Set suggestHS = new HashSet(); - List suggestList = new ArrayList<>(); - - // please make sure that the completion field is configured in the ES mapping - CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100); - SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder)); - SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet(); - - Iterator iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator(); - - while (iterator.hasNext()) { - Suggest.Suggestion.Entry.Option next = iterator.next(); - String suggest = next.getText().string().toLowerCase(); - suggestList.add(suggest); - } - - suggestHS.addAll(suggestList); - suggestList.clear(); - suggestList.addAll(suggestHS); - return suggestList; - } - - public void close() { - client.close(); - } - - public void refreshIndex() { - client.admin().indices().prepareRefresh().execute().actionGet(); - } - - /** - * Generates a TransportClient or NodeClient - * - * @param props a populated {@link java.util.Properties} object - * @return a constructed {@link org.elasticsearch.client.Client} - * @throws IOException if there is an error building the - * {@link org.elasticsearch.client.Client} - */ - protected Client makeClient(Properties props) throws IOException { - String clusterName = props.getProperty(MudrodConstants.ES_CLUSTER); - String hostsString = props.getProperty(MudrodConstants.ES_UNICAST_HOSTS); - String[] hosts = hostsString.split(","); - String portStr = props.getProperty(MudrodConstants.ES_TRANSPORT_TCP_PORT); - int port = Integer.parseInt(portStr); - - Settings.Builder settingsBuilder = Settings.builder(); - - // Set the cluster name and build the settings - if (!clusterName.isEmpty()) - settingsBuilder.put("cluster.name", clusterName); - - settingsBuilder.put("http.type", "netty3"); - settingsBuilder.put("transport.type", "netty3"); - - Settings settings = settingsBuilder.build(); - - Client client = null; - - // Prefer TransportClient - if (hosts != null && port > 1) { - TransportClient transportClient = new ESTransportClient(settings); - for (String host : hosts) - transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); - client = transportClient; - } else if (clusterName != null) { - node = new Node(settings); - client = node.client(); - } - - return client; - } - - /** - * Main method used to invoke the ESDriver implementation. - * - * @param args no arguments are required to invoke the Driver. - */ - public static void main(String[] args) { - MudrodEngine mudrodEngine = new MudrodEngine(); - ESDriver es = new ESDriver(mudrodEngine.loadConfig()); - es.getTypeListWithPrefix("podaacsession", "sessionstats"); - } - - /** - * @return the client - */ - public Client getClient() { - return client; - } - - /** - * @param client the client to set - */ - public void setClient(Client client) { - this.client = client; - } - - /** - * @return the bulkProcessor - */ - public BulkProcessor getBulkProcessor() { - return bulkProcessor; - } - - /** - * @param bulkProcessor the bulkProcessor to set - */ - public void setBulkProcessor(BulkProcessor bulkProcessor) { - this.bulkProcessor = bulkProcessor; - } - - public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) { - - UpdateRequest ur = null; - try { - ur = new UpdateRequest(index, type, id).doc(jsonBuilder().startObject().field(field1, value1).endObject()); - } catch (IOException e) { - LOG.error("Error whilst attempting to generate a new Update Request.", e); - } - - return ur; - } - - public UpdateRequest generateUpdateRequest(String index, String type, String id, Map filedValueMap) { - - UpdateRequest ur = null; - try { - XContentBuilder builder = jsonBuilder().startObject(); - for (Entry entry : filedValueMap.entrySet()) { - String key = entry.getKey(); - builder.field(key, filedValueMap.get(key)); - } - builder.endObject(); - ur = new UpdateRequest(index, type, id).doc(builder); - } catch (IOException e) { - LOG.error("Error whilst attempting to generate a new Update Request.", e); - } - - return ur; - } - - public int getDocCount(String index, String... type) { - MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); - String[] indexArr = new String[] { index }; - return this.getDocCount(indexArr, type, search); - } - - public int getDocCount(String[] index, String[] type) { - MatchAllQueryBuilder search = QueryBuilders.matchAllQuery(); - return this.getDocCount(index, type, search); - } - - public int getDocCount(String[] index, String[] type, QueryBuilder filterSearch) { - SearchRequestBuilder countSrBuilder = getClient().prepareSearch(index).setTypes(type).setQuery(filterSearch).setSize(0); - SearchResponse countSr = countSrBuilder.execute().actionGet(); - int docCount = (int) countSr.getHits().getTotalHits(); - return docCount; - } -} diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriver.java b/core/src/main/java/org/apache/sdap/mudrod/driver/StorageDriver.java similarity index 100% rename from storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriver.java rename to core/src/main/java/org/apache/sdap/mudrod/driver/StorageDriver.java diff --git a/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java b/core/src/main/java/org/apache/sdap/mudrod/driver/StorageDriverFactory.java similarity index 98% rename from storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java rename to core/src/main/java/org/apache/sdap/mudrod/driver/StorageDriverFactory.java index ef3c80b..93b7f61 100644 --- a/storage/src/main/java/org/apache/sdap/mudrod/storage/StorageDriverFactory.java +++ b/core/src/main/java/org/apache/sdap/mudrod/driver/StorageDriverFactory.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.sdap.mudrod.storage; +package org.apache.sdap.mudrod.driver; import java.io.IOException; import java.io.InputStream; diff --git a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java index 2a43e4d..97d59a1 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java +++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodConstants.java @@ -137,6 +137,8 @@ public interface MudrodConstants { public static final String SPARK_APP_NAME = "mudrod.spark.app.name"; public static final String SPARK_MASTER = "mudrod.spark.master"; + + public static final String STORAGE_DRIVER = "mudrod.storage.driver"; public static final String RANKING_MODEL = "mudrod.ranking.model"; diff --git a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java index 80b0856..c10a7e4 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java +++ b/core/src/main/java/org/apache/sdap/mudrod/main/MudrodEngine.java @@ -27,9 +27,11 @@ import org.apache.sdap.mudrod.discoveryengine.OntologyDiscoveryEngine; import org.apache.sdap.mudrod.discoveryengine.RecommendEngine; import org.apache.sdap.mudrod.discoveryengine.WeblogDiscoveryEngine; -import org.apache.sdap.mudrod.driver.ESDriver; +import org.apache.sdap.mudrod.driver.StorageDriver; +import org.apache.sdap.mudrod.driver.StorageDriverFactory; import org.apache.sdap.mudrod.driver.SparkDriver; import org.apache.sdap.mudrod.integration.LinkageIntegration; +import org.apache.sdap.mudrod.utils.ClassLoadingUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +59,7 @@ public class MudrodEngine { private static final Logger LOG = LoggerFactory.getLogger(MudrodEngine.class); private Properties props = new Properties(); - private ESDriver es = null; + private StorageDriver sd = null; private SparkDriver spark = null; private static final String LOG_INGEST = "logIngest"; private static final String META_INGEST = "metaIngest"; @@ -75,13 +77,17 @@ public MudrodEngine() { } /** - * Start the {@link ESDriver}. Should only be called after call to + * Start the {@link StorageDriver}. Should only be called after call to * {@link MudrodEngine#loadConfig()} * - * @return fully provisioned {@link ESDriver} + * @return fully provisioned {@link StorageDriver} + * @throws Exception + * @throws ClassNotFoundException */ - public ESDriver startESDriver() { - return new ESDriver(props); + public StorageDriver startStorageDriver() throws ClassNotFoundException, Exception { + return StorageDriverFactory.createDataStore( + ClassLoadingUtils.loadClass(props.getProperty(MudrodConstants.STORAGE_DRIVER, "org.apache.sdap.mudrod.storage.solr.SolrDriver")), + props); } /** @@ -105,12 +111,12 @@ public Properties getConfig() { } /** - * Retreive the Mudrod {@link ESDriver} + * Retreive the Mudrod {@link StorageDriver} * - * @return the {@link ESDriver} instance. + * @return the {@link StorageDriver} instance. */ - public ESDriver getESDriver() { - return this.es; + public StorageDriver getStorageDriver() { + return this.sd; } /** @@ -119,8 +125,8 @@ public ESDriver getESDriver() { * @param es * an ES driver instance */ - public void setESDriver(ESDriver es) { - this.es = es; + public void setStorageDriver(StorageDriver sd) { + this.sd = sd; } private InputStream locateConfig() { @@ -214,7 +220,7 @@ private String decompressSVMWithSGDModel(String archiveName) throws IOException * for weblog */ public void startLogIngest() { - DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, sd, spark); wd.preprocess(); wd.process(); LOG.info("Logs have been ingested successfully"); @@ -224,26 +230,26 @@ public void startLogIngest() { * updating and analysing metadata to metadata similarity results */ public void startMetaIngest() { - DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, sd, spark); md.preprocess(); md.process(); - DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark); + DiscoveryEngineAbstract recom = new RecommendEngine(props, sd, spark); recom.preprocess(); recom.process(); LOG.info("Metadata has been ingested successfully."); } public void startFullIngest() { - DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, sd, spark); wd.preprocess(); wd.process(); - DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, sd, spark); md.preprocess(); md.process(); - DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark); + DiscoveryEngineAbstract recom = new RecommendEngine(props, sd, spark); recom.preprocess(); recom.process(); LOG.info("Full ingest has finished successfully."); @@ -254,30 +260,30 @@ public void startFullIngest() { * weblog, ontology and metadata, linkage discovery and integration. */ public void startProcessing() { - DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract wd = new WeblogDiscoveryEngine(props, sd, spark); wd.process(); - DiscoveryEngineAbstract od = new OntologyDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract od = new OntologyDiscoveryEngine(props, sd, spark); od.preprocess(); od.process(); - DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, es, spark); + DiscoveryEngineAbstract md = new MetadataDiscoveryEngine(props, sd, spark); md.preprocess(); md.process(); - LinkageIntegration li = new LinkageIntegration(props, es, spark); + LinkageIntegration li = new LinkageIntegration(props, sd, spark); li.execute(); - DiscoveryEngineAbstract recom = new RecommendEngine(props, es, spark); + DiscoveryEngineAbstract recom = new RecommendEngine(props, sd, spark); recom.process(); } /** - * Close the connection to the {@link ESDriver} instance. + * Close the connection to the {@link StorageDriver} instance. */ public void end() { - if (es != null) { - es.close(); + if (sd != null) { + sd.close(); } } @@ -366,7 +372,8 @@ public static void main(String[] args) { me.props.put(MudrodConstants.ES_HTTP_PORT, esHttpPort); } - me.es = new ESDriver(me.getConfig()); + //check out logic presewnted in #startStorageDriver + me.sd = new StorageDriver(?, me.getConfig()); me.spark = new SparkDriver(me.getConfig()); loadPathConfig(me, dataDir); if (processingType != null) { diff --git a/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java b/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java index bda78a5..9e63036 100644 --- a/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java +++ b/core/src/main/java/org/apache/sdap/mudrod/utils/ClassLoadingUtils.java @@ -46,7 +46,7 @@ public static Class loadClass(String className) throws ClassNotFoundException * @param contextClass The name of a context class to use. * @param className The name of the class to load. * @return The class or null if no class loader could load the class. - * @throws ClassNotFoundException Aif and only if no definition for the class with the specified name could be found. + * @throws ClassNotFoundException if and only if no definition for the class with the specified name could be found. */ public static Class loadClass(Class contextClass, String className) throws ClassNotFoundException { Class clazz = null; diff --git a/core/src/main/resources/config.properties b/core/src/main/resources/config.properties index 5ed2504..c280ef5 100644 --- a/core/src/main/resources/config.properties +++ b/core/src/main/resources/config.properties @@ -10,7 +10,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Database configuration +# StorageDriver Class +mudrod.storage.driver=org.apache.sdap.mudrod.storage.elasticsearch.ElasticSearchDriver + +# Elasticsearch configuration mudrod.cluster.name = MudrodES mudrod.es.transport.tcp.port = 9300 mudrod.es.unicast.hosts = 127.0.0.1 diff --git a/pom.xml b/pom.xml index 4687ae4..38d0f83 100644 --- a/pom.xml +++ b/pom.xml @@ -135,7 +135,6 @@ - analysis core service storage diff --git a/web/pom.xml b/web/pom.xml index 2c2da28..5b95c58 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -23,7 +23,6 @@ 0.0.1-SNAPSHOT ../
- mudrod-web