[FLINK-27919][connectors] Add FLIP-27-based Data Generator Source.#1
[FLINK-27919][connectors] Add FLIP-27-based Data Generator Source.#1afedulov wants to merge 12 commits intoakalash:masterfrom
Conversation
akalash
left a comment
There was a problem hiding this comment.
Я оставил пару комментариев. Я все еще не уверен стоит ли добавлять currentParallelism в SourceReaderContext с одной стороны это не то чтобы проблема, но не понятно насколько это надо. В целом если это уже обсуждали, то ок.
| MapFunction<Long, OUT> generatorFunction, long count, TypeInformation<OUT> typeInfo) { | ||
| this.typeInfo = checkNotNull(typeInfo); | ||
| this.generatorFunction = checkNotNull(generatorFunction); | ||
| this.numberSource = new NumberSequenceSource(0, count); |
There was a problem hiding this comment.
Лучше иметь только один конструктор с присвоением значений, для избежания ошибок, а остальные должны использовать this(*) с дефолтными значениями если нужно.
| RateLimiter rateLimiter = new GuavaRateLimiter(maxPerSecond, parallelism); | ||
| return new RateLimitedSourceReader<>( | ||
| new MappingIteratorSourceReader<>(readerContext, generatorFunction), | ||
| rateLimiter); |
There was a problem hiding this comment.
Пока мне не понятно, как мне использовать другой RateLimiter если понадобиться. Может быть это и не особо имеет смысл, и если это уже было обсуждено то ок. Но просто как идея, возможно стоит рассмотреть передачу RateLimiter как параметра в этот класс. Т.е. вместо:
new DataGeneratorSourceV3<>(generator, 1000, 2, Types.STRING);
будет
new DataGeneratorSourceV3<>(generator, 1000, RateLimiters.guava(2), Types.STRING);
Хотя скорее всего это будет не сам Limiter а его билдер, т.к. ему нужно как передать readerContext. Вообщем не уверен что есть смысл так заморачиваться, но можно подумать.
There was a problem hiding this comment.
Я решил сделать весь генератор более extendable с возможностью factory для SourceReader's . Пользователь сможет в этой factory решить что и как инициализировать.
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java
Show resolved
Hide resolved
| import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter; | ||
|
|
||
| /** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */ | ||
| public class GuavaRateLimiter implements org.apache.flink.api.common.io.ratelimiting.RateLimiter { |
There was a problem hiding this comment.
Зачем тут полный путь до RateLimiter?
There was a problem hiding this comment.
Клэш с гуавой (см import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter)
|
|
||
| @Override | ||
| public int acquire() { | ||
| return (int) rateLimiter.acquire(); |
There was a problem hiding this comment.
Кажется это не правильно, вроде как guava возвращает в секундах и если мы хотим в миллисекундах нужно :
return (int) 1000 * rateLimiter.acquire();
Но это надо проверить
There was a problem hiding this comment.
Действительно. Странный выбор с их стороны, но да ладно. Nice catch!
| import static org.apache.flink.util.Preconditions.checkNotNull; | ||
|
|
||
| @Experimental | ||
| public class MappingIteratorSourceReader< |
There was a problem hiding this comment.
Надо подумать над названием, но я бы наверное ожидал что-то более связанное с Generator чтобы было проще найти. Хотя может и без разницы.
There was a problem hiding this comment.
Либо данный класс не должен принимать generatorFuntion а должен принимать что-то типа mappingFunction
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
Outdated
Show resolved
Hide resolved
|
@akalash |
No description provided.