Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ service/.classpath
web/.classpath
web/.externalToolBuilders/
web/maven-eclipse.xml
service/mudrod_service.log*
21 changes: 19 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

<artifactId>mudrod-core</artifactId>

<name>Mudrod :: Core</name>
<description>Core Mudrod library implementation.</description>
<name>SDAP Search :: Core</name>
<description>Core SDAP Search library</description>

<properties>
<!-- This property is the name of the directory containing the SVM/SGD Model. It is used during build to create a zip file from this directory -->
Expand Down Expand Up @@ -72,6 +72,23 @@
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
</dependency>
<!-- Optional Dependencies to support Geo Queries
see https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.2/java-geo-queries.html
-->
<dependency>
<groupId>org.locationtech.spatial4j</groupId>
<artifactId>spatial4j</artifactId>
</dependency>
<dependency>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
<exclusions>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- End of Elasticsearch dependencies -->

<!-- Spark dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public abstract class MudrodAbstract implements Serializable {
protected long endTime;

protected static final String ES_SETTINGS = "elastic_settings.json";
protected static final String ES_MAPPINGS = "elastic_mappings.json";
protected static final String ES_DEFAULT_MAPPINGS = "elastic_default_mappings.json";
protected static final String ES_RAW_METADATA_MAPPINGS = "elastic_raw_metadata_mappings.json";

public MudrodAbstract(Properties props, ESDriver es, SparkDriver spark) {
this.props = props;
Expand All @@ -62,8 +63,27 @@ public MudrodAbstract(Properties props, ESDriver es, SparkDriver spark) {
*/
@CheckForNull
protected void initMudrod() {
initMudrod("_default_");
}

/**
* Method of setting up essential configuration for MUDROD to start
* @param type the 'type' of storage mapping to establish. Options include
* '_default_' and 'RawMetadata'.
*/
@CheckForNull
protected void initMudrod(String type) {
InputStream settingsStream = getClass().getClassLoader().getResourceAsStream(ES_SETTINGS);
InputStream mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_MAPPINGS);
InputStream mappingsStream = null;
switch (type) {
case "RawMetadata":
mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_RAW_METADATA_MAPPINGS);
break;
default:
mappingsStream = getClass().getClassLoader().getResourceAsStream(ES_DEFAULT_MAPPINGS);
break;
}

JSONObject settingsJSON = null;
JSONObject mappingJSON = null;

Expand All @@ -81,7 +101,7 @@ protected void initMudrod() {

try {
if (settingsJSON != null && mappingJSON != null) {
this.es.putMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME), settingsJSON.toString(), mappingJSON.toString());
this.es.putMapping(props.getProperty(MudrodConstants.ES_INDEX_NAME), type, settingsJSON.toString(), mappingJSON.toString());
}
} catch (IOException e) {
LOG.error("Error entering Elasticsearch Mappings!", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public void preprocess() {

@Override
public void process() {
// TODO Auto-generated method stub
LOG.info("Recommendation processing starts.");

startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
import org.apache.sdap.mudrod.driver.ESDriver;
import org.apache.sdap.mudrod.driver.SparkDriver;
import org.apache.sdap.mudrod.main.MudrodConstants;
import org.apache.sdap.mudrod.weblog.pre.*;
import org.apache.sdap.mudrod.weblog.pre.ClickStreamGenerator;
import org.apache.sdap.mudrod.weblog.pre.CrawlerDetection;
import org.apache.sdap.mudrod.weblog.pre.HistoryGenerator;
import org.apache.sdap.mudrod.weblog.pre.ImportLogFile;
import org.apache.sdap.mudrod.weblog.pre.RemoveRawLog;
import org.apache.sdap.mudrod.weblog.pre.SessionGenerator;
import org.apache.sdap.mudrod.weblog.pre.SessionStatistic;
import org.apache.sdap.mudrod.weblog.process.ClickStreamAnalyzer;
import org.apache.sdap.mudrod.weblog.process.UserHistoryAnalyzer;
import org.slf4j.Logger;
Expand Down
128 changes: 100 additions & 28 deletions core/src/main/java/org/apache/sdap/mudrod/driver/ESDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,15 @@
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -123,7 +130,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
throw new RuntimeException("Caught exception in bulk: " + request.getDescription() + ", failure: " + failure, failure);
}
}).setBulkActions(1000).setBulkSize(new ByteSizeValue(2500500, ByteSizeUnit.GB)).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 10)).setConcurrentRequests(1)
.build());
.build());
}

public void destroyBulkProcessor() {
Expand All @@ -137,15 +144,11 @@ public void destroyBulkProcessor() {
}
}

public void putMapping(String indexName, String settingsJson, String mappingJson) throws IOException {

boolean exists = getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists();
if (exists) {
return;
public void putMapping(String indexName, String type, String settingsJson, String mappingJson) throws IOException {
if(!getClient().admin().indices().prepareExists(indexName).execute().actionGet().isExists()) {
getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet();
}

getClient().admin().indices().prepareCreate(indexName).setSettings(Settings.builder().loadFromSource(settingsJson)).execute().actionGet();
getClient().admin().indices().preparePutMapping(indexName).setType("_default_").setSource(mappingJson).execute().actionGet();
getClient().admin().indices().preparePutMapping(indexName).setType(type).setSource(mappingJson).execute().actionGet();
}

public String customAnalyzing(String indexName, String str) throws InterruptedException, ExecutionException {
Expand Down Expand Up @@ -188,11 +191,11 @@ public void deleteAllByQuery(String index, String type, QueryBuilder query) {
.metaData()
.index(index)
.getMappings();

//check if the type exists
if (!mappings.containsKey(type))
return;

createBulkProcessor();
SearchResponse scrollResp = getClient()
.prepareSearch(index)
Expand All @@ -210,7 +213,11 @@ public void deleteAllByQuery(String index, String type, QueryBuilder query) {
getBulkProcessor().add(deleteRequest);
}

scrollResp = getClient().prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
scrollResp = getClient()
.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(600000))
.execute()
.actionGet();
if (scrollResp.getHits().getHits().length == 0) {
break;
}
Expand Down Expand Up @@ -255,19 +262,63 @@ public List<String> getIndexListWithPrefix(Object object) {
return indexList;
}

public String searchByQuery(String index, String type, String query) throws IOException, InterruptedException, ExecutionException {
return searchByQuery(index, type, query, false);
public List<Map<String, Object>> getAllDocs(String index, String... types){
int scrollSize = 5000;
List<Map<String,Object>> esData = new ArrayList<>();
SearchResponse response = null;
int i = 0;
while( response == null || response.getHits().hits().length != 0){
response = client.prepareSearch(index)
.setTypes(types)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(scrollSize)
.setFrom(i * scrollSize)
.execute()
.actionGet();
for(SearchHit hit : response.getHits()){
esData.add(hit.getSource());
}
i++;
}
return esData;
}

/**
* Convenience method to search an index by document types and a text string.
* @param index the index to query
* @param query the query string (text string) itself
* @param types the types to query
* @return JSON {@link java.lang.String} representing the query results.
*/
public String searchByQuery(String index, String query, String... types) {
return searchByQuery(index, QueryBuilders.queryStringQuery(query), false, types);
}

/**
* Convenience method to search an index by document types and a text string. This
* method includes an optional flag which toggles detailed metadata retrieval.
* @param index the index to query
* @param queryBuilder the QueryBuilder object to execute
* @param bDetail whether this is a full, detailed query over entire
* metadata as oppose to a subset of the available metadata. False by
* default.
* @param types the types to query
* @return JSON {@link java.lang.String} representing the query results.
*/
@SuppressWarnings("unchecked")
public String searchByQuery(String index, String type, String query, Boolean bDetail) throws IOException, InterruptedException, ExecutionException {
public String searchByQuery(String index, QueryBuilder queryBuilder, Boolean bDetail, String... types) {
boolean exists = getClient().admin().indices().prepareExists(index).execute().actionGet().isExists();
if (!exists) {
return null;
}

QueryBuilder qb = QueryBuilders.queryStringQuery(query);
SearchResponse response = getClient().prepareSearch(index).setTypes(type).setQuery(qb).setSize(500).execute().actionGet();
SearchResponse response = getClient()
.prepareSearch(index)
.setTypes(types)
.setQuery(queryBuilder)
.setSize(5000)
.execute()
.actionGet();

// Map of K,V pairs where key is the field name from search result and value is the that should be returned for that field. Not always the same.
Map<String, String> fieldsToReturn = new HashMap<>();
Expand All @@ -277,6 +328,8 @@ public String searchByQuery(String index, String type, String query, Boolean bDe
fieldsToReturn.put("DatasetParameter-Topic", "Topic");
fieldsToReturn.put("Dataset-Description", "Dataset-Description");
fieldsToReturn.put("DatasetCitation-ReleaseDateLong", "Release Date");
fieldsToReturn.put("Dataset-Metadata", "Dataset-Metadata");
fieldsToReturn.put("events", "events");

if (bDetail) {
fieldsToReturn.put("DatasetPolicy-DataFormat", "DataFormat");
Expand Down Expand Up @@ -313,17 +366,18 @@ public String searchByQuery(String index, String type, String query, Boolean bDe
Map<String, Object> source = hit.getSource();

Map<String, Object> searchResult = source.entrySet().stream().filter(entry -> fieldsToReturn.keySet().contains(entry.getKey()))
.collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue));
.collect(Collectors.toMap(entry -> fieldsToReturn.get(entry.getKey()), Entry::getValue));

// searchResult is now a map where the key = value from fieldsToReturn and the value = value from search result

searchResult.put("id", hit.getId());

// Some results require special handling/formatting:
// Release Date formatting
LocalDate releaseDate = Instant.ofEpochMilli(Long.parseLong(((ArrayList<String>) searchResult.get("Release Date")).get(0))).atZone(ZoneId.of("Z")).toLocalDate();
searchResult.put("Release Date", releaseDate.format(DateTimeFormatter.ISO_DATE));

if (bDetail) {

// DataFormat value, translate RAW to BINARY
if ("RAW".equals(searchResult.get("DataFormat"))) {
searchResult.put("DataFormat", "BINARY");
Expand All @@ -336,8 +390,8 @@ public String searchByQuery(String index, String type, String query, Boolean bDe
// Time Span Formatting
LocalDate startDate = Instant.ofEpochMilli((Long) searchResult.get("DatasetCoverage-StartTimeLong-Long")).atZone(ZoneId.of("Z")).toLocalDate();
LocalDate endDate = "".equals(searchResult.get("Dataset-DatasetCoverage-StopTimeLong")) ?
null :
Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate();
null :
Instant.ofEpochMilli(Long.parseLong(searchResult.get("Dataset-DatasetCoverage-StopTimeLong").toString())).atZone(ZoneId.of("Z")).toLocalDate();
searchResult.put("Time Span", startDate.format(DateTimeFormatter.ISO_DATE) + " to " + (endDate == null ? "Present" : endDate.format(DateTimeFormatter.ISO_DATE)));

// Temporal resolution can come from one of two fields
Expand All @@ -358,7 +412,7 @@ public String searchByQuery(String index, String type, String query, Boolean bDe

// Measurement is a list of hierarchies that goes Topic -> Term -> Variable -> Variable Detail. Need to construct these hierarchies.
List<List<String>> measurements = buildMeasurementHierarchies((List<String>) searchResult.get("Topic"), (List<String>) searchResult.get("DatasetParameter-Term-Full"),
(List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail"));
(List<String>) searchResult.get("DatasetParameter-Variable-Full"), (List<String>) searchResult.get("DatasetParameter-VariableDetail"));

searchResult.put("Measurements", measurements);

Expand Down Expand Up @@ -432,15 +486,21 @@ public List<String> autoComplete(String index, String term) {
return new ArrayList<>();
}

Set<String> suggestHS = new HashSet<String>();
Set<String> suggestHS = new HashSet<>();
List<String> suggestList = new ArrayList<>();

// please make sure that the completion field is configured in the ES mapping
CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders.completionSuggestion("Dataset-Metadata").prefix(term, Fuzziness.fromEdits(2)).size(100);
SearchRequestBuilder suggestRequestBuilder = getClient().prepareSearch(index).suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder));
CompletionSuggestionBuilder suggestionsBuilder = SuggestBuilders
.completionSuggestion("Dataset-Metadata")
.prefix(term, Fuzziness.fromEdits(2))
.size(100);
SearchRequestBuilder suggestRequestBuilder = getClient()
.prepareSearch(index)
.suggest(new SuggestBuilder().addSuggestion("completeMe", suggestionsBuilder));
SearchResponse sr = suggestRequestBuilder.setFetchSource(false).execute().actionGet();

Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest().getSuggestion("completeMe").iterator().next().getOptions().iterator();
Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = sr.getSuggest()
.getSuggestion("completeMe").iterator().next().getOptions().iterator();

while (iterator.hasNext()) {
Suggest.Suggestion.Entry.Option next = iterator.next();
Expand Down Expand Up @@ -490,7 +550,9 @@ protected Client makeClient(Properties props) throws IOException {

Client client = null;

if (hosts != null && port > 1) {
// Prefer TransportClient
if (hosts != null && port > 1) {
@SuppressWarnings("resource")
TransportClient transportClient = new ESTransportClient(settings);
for (String host : hosts)
transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
Expand Down Expand Up @@ -542,6 +604,16 @@ public void setBulkProcessor(BulkProcessor bulkProcessor) {
this.bulkProcessor = bulkProcessor;
}

/**
*
* @param index index you wish to update documents within
* @param type document type you wish to update
* @param id a unique document ID you wish to update
* @param field1 the field of the document you want to update
* @param value1 the associated value for that field
* @return a populated UpdateRequest
* ready to execute.
*/
public UpdateRequest generateUpdateRequest(String index, String type, String id, String field1, Object value1) {

UpdateRequest ur = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public interface MudrodConstants {
/** Defined on CLI */
public static final String MUDROD_CONFIG = "MUDROD_CONFIG";
/**
* An {@link Ontology} implementation.
* An {@link org.apache.sdap.mudrod.ontology.Ontology} implementation.
*/
public static final String ONTOLOGY_IMPL = MUDROD + "ontology.implementation";

Expand Down
Loading