Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,11 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_BASELINE_AUTO_ADJUST_ENABLED = "IGNITE_BASELINE_AUTO_ADJUST_ENABLED";

/**
* Flag to enable persistence rebalance.
*/
public static final String IGNITE_PERSISTENCE_REBALANCE_ENABLED = "IGNITE_PERSISTENCE_REBALANCE_ENABLED";

/**
* Maximum number of diagnostic warning messages per category, when waiting for PME.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ public class DataStorageConfiguration implements Serializable {
/** Default wal archive directory. */
public static final String DFLT_WAL_ARCHIVE_PATH = "db/wal/archive";

/** Default working directory for backup temporary files. */
public static final String DFLT_BACKUP_DIRECTORY = "db/backup";

/** Default write throttling enabled. */
public static final boolean DFLT_WRITE_THROTTLING_ENABLED = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ public enum GridTopic {
TOPIC_SERVICES,

/** */
TOPIC_DEADLOCK_DETECTION;
TOPIC_DEADLOCK_DETECTION,

/** */
TOPIC_REBALANCE;

/** Enum values. */
private static final GridTopic[] VALS = values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ public enum IgniteFeatures {
TCP_DISCOVERY_MESSAGE_NODE_COMPACT_REPRESENTATION(14),

/** LRT system and user time dump settings. */
LRT_SYSTEM_USER_TIME_DUMP_SETTINGS(18);
LRT_SYSTEM_USER_TIME_DUMP_SETTINGS(18),

/** */
CACHE_PARTITION_FILE_REBALANCE(19);

/**
* Unique feature identifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
import org.apache.ignite.internal.processors.cache.preload.GridPartitionBatchDemandMessage;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
Expand Down Expand Up @@ -1166,6 +1167,11 @@ public GridIoMessageFactory(MessageFactory[] ext) {

break;

case GridPartitionBatchDemandMessage.TYPE_CODE:
msg = new GridPartitionBatchDemandMessage();

break;

// [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Class represents a file meta information to send to the remote node. Used to initiate a new file transfer
* process or to continue the previous unfinished from the last transmitted point.
*/
class TransmissionMeta implements Externalizable {
public class TransmissionMeta implements Externalizable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
* Persistent store of pages.
*/
public interface PageStore {
/**
* @param lsnr Page store listener to set.
*/
public void setListener(PageStoreListener lsnr);

/**
* Checks if page exists.
*
Expand Down Expand Up @@ -55,7 +60,18 @@ public interface PageStore {
* @param keepCrc by default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc
* @throws IgniteCheckedException If reading failed (IO error occurred).
*/
public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException;
public default void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException {
readPage(pageId, pageBuf, keepCrc);
}

/**
* @param pageId Page id.
* @param pageBuf Page buffer to read into.
* @param keepCrc by default reading zeroes CRC which was on file, but you can keep it in pageBuf if set keepCrc
* @return Number of read bytes, or negative value if page read the first time.
* @throws IgniteCheckedException If reading failed (IO error occurred).
*/
public int readPage(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException;

/**
* Reads a header.
Expand Down Expand Up @@ -97,6 +113,11 @@ public interface PageStore {
*/
public void ensure() throws IgniteCheckedException;

/**
* Size of page store header.
*/
public int headerSize();

/**
* @return Page store version.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.pagemem.store;

import java.nio.ByteBuffer;

/**
*
*/
@FunctionalInterface
public interface PageStoreListener {
/** Default handler. */
public static PageStoreListener NO_OP = (pageId, buff) -> {};

/**
* @param pageId Handled page id.
* @param buf Buffer with data.
*/
public void onPageWrite(long pageId, ByteBuffer buf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
GridDhtPartitionState state = top.partitionState(waitNode, part);

if (state != GridDhtPartitionState.OWNING) {
System.out.println(">xxx> not owning " + part);
rebalanced = false;

break;
Expand All @@ -297,13 +298,22 @@ void checkRebalanceState(GridDhtPartitionTopology top, Integer checkGrpId) {
}

if (rebalanced) {
System.out.println(">>> checkRebaalnceState remove " + checkGrpId);

waitInfo.waitGrps.remove(checkGrpId);

if (waitInfo.waitGrps.isEmpty()) {
System.out.println(">>> waitInfo.waitGrps empty ");

msg = affinityChangeMessage(waitInfo);

waitInfo = null;
}
else {
Map.Entry<Integer, Map<Integer, UUID>> e = waitInfo.waitGrps.entrySet().iterator().next();

System.out.println(">>> waitInfo.waitGrps=" + waitInfo.waitGrps.keySet().size() + ", first=[name=" + cctx.cache().cacheGroup(e.getKey()).cacheOrGroupName() + ", count=" + e.getValue().size() + "]");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;

/**
*
*/
public interface CacheDataStoreEx extends CacheDataStore {
public CacheDataStore store(boolean readOnly);

public void readOnly(boolean readOnly);

public boolean readOnly();

// /**
// * @param mode The storage mode.
// * @return The storage intance for the given mode.
// */
// public CacheDataStore store(StorageMode mode);
//
// /**
// * @param mode The mode to switch to.
// */
// public void storeMode(StorageMode mode);
//
// /**
// * @return The currently used storage mode. Some of the long-running threads will remain to use
// * the old mode until they finish.
// */
// public StorageMode storeMode();

// /**
// * @return The storage is used to expose temporary cache data rows when the <tt>LOG_ONLY</tt> mode is active.
// */
// public IgnitePartitionCatchUpLog catchLog();
//
// /**
// * @param mode The mode to switch to.
// */
// public IgniteInternalFuture<Void> storeModeAsync(StorageMode mode);

// /**
// * @param mode The mode to associate with data storage instance.
// * @param storage The cache data storage instance to set to.
// */
// public void store(StorageMode mode, IgniteCacheOffheapManager.CacheDataStore storage);

// /**
// *
// */
// public enum StorageMode {
// /** Proxy will normally route all operations to the PageMemrory. */
// FULL,
//
// /** Proxy will redirect the write operations to the temp-WAL storage. */
// READ_ONLY;
// }
}
Loading