Является ли Kafka Stream Statestore Global по всем экземплярам или просто местным? -- apache-kafka поле с участием apache-kafka-streams пол Связанный проблема

Is Kafka Stream StateStore global over all instances or just local?


12
vote

проблема

русский

в kafka Stream <код> command => 'C:WindowsSystem32cmd.exe /c C:TempdotNetFx40_Full_setup.exe /q /norestart /log C:TempNetFx40.htm', 1 Пример, он использует <код> command => 'C:WindowsSystem32cmd.exe /c C:TempdotNetFx40_Full_setup.exe /q /norestart /log C:TempNetFx40.htm', 2 для хранения количества слов. Если в той же группе потребителей есть несколько экземпляров, <код> command => 'C:WindowsSystem32cmd.exe /c C:TempdotNetFx40_Full_setup.exe /q /norestart /log C:TempNetFx40.htm', 3 глобальный в группу, или просто локальный к экземпляру потребителей?

thnaks

Английский оригинал

In Kafka Stream WordCount example, it uses StateStore to store word counts. If there are multiple instances in the same consumer group, the StateStore is global to the group, or just local to an consumer instance?

Thnaks

</div
     

Список ответов

24
 
vote
vote
Лучший ответ
 

Это зависит от вашего представления о состоянии состояния.

  1. в kafka Streams. Состояние совместно используется и, следовательно, каждый экземпляр содержит часть общего состояния приложения. Например, использование DSL-гостевого оператора используйте локальный экземпляр ROCKSDB, чтобы удерживать осколок состояния. Таким образом, с этим рассматривается состояние локальное.

  2. С другой стороны, все изменения в штате записаны в кафка-тему. Эта тема не «живет» на хосте приложения, но в кластере KAFKA и состоит из нескольких разделов и может быть реплицирована. В случае ошибки эта тема изменений используется для воссоздания состояния неисправного экземпляра в другом приведенном месте. Таким образом, в качестве изменяемого файла доступен всеми экземплярами приложений, его тоже можно считать глобальным.

Имейте в виду, что изменяемый файл - это правда из состояния приложения, а местные магазины в основном кешируют осколки состояния.

Более того, в примере WordCount поток записи (поток данных) получает разделение слов, так что подсчет одного слова будет поддерживаться одним экземпляром (и разными экземплярами, поддерживающими количество для разных слов). < / P >.

Для архитектурного обзора, я рекомендую http://docs.confluent.io/current /streams/architecture.html

Также этот пост блога должен быть интересным http://www.confluent.io/blog/uning-stream-processing-and--interactive-queries-in-apache-kafka/

 

This depends on your view on a state store.

  1. In Kafka Streams a state is shared and thus each instance holds part of the overall application state. For example, using DSL stateful operator use a local RocksDB instance to hold their shard of the state. Thus, with this regard the state is local.

  2. On the other hand, all changes to the state are written into a Kafka topic. This topic does not "live" on the application host but in the Kafka cluster and consists of multiple partition and can be replicated. In case of an error, this changelog topic is used to recreate the state of the failed instance in another still running instance. Thus, as the changelog is accessible by all application instances, it can be considered to be global, too.

Keep in mind, that the changelog is the truth of the application state and the local stores are basically caches of shards of the state.

Moreover, in the WordCount example, a record stream (the data stream) gets partitioned by words, such that the count of one word will be maintained by a single instance (and different instances maintain the counts for different words).

For an architectural overview, I recommend http://docs.confluent.io/current/streams/architecture.html

Also this blog post should be interesting http://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

</div
 
 
       
       
3
 
vote

Если стоит упомянуть, что есть Глобалка >

Globalktable будет полностью воспроизведен один раз на кафкапротеи. То есть каждый экземпляр KAFKaStreams потребла всех разделов соответствующая тема.

из списка рассылки слияния, у меня есть эта информация

Вы можете начать Прототипирование с использованием KAFKA 0.10.2 (или багажника) филиал ...

0.10.2-RC0 уже имеет глобалкеты!

Вот the the Фактический PR .

и человек, который сказал мне, что это было Matthias J. Sax;)

 

If worth mentioning that there is a GlobalKTable improvement proposal

GlobalKTable will be fully replicated once per KafkaStreams instance. That is, each KafkaStreams instance will consume all partitions of the corresponding topic.

From the Confluent Platform's mailing list, I've got this information

You could start prototyping using Kafka 0.10.2 (or trunk) branch...

0.10.2-rc0 already has GlobalKTable!

Here's the actual PR.

And the person that told me that was Matthias J. Sax ;)

</div
 
 
2
 
vote

Используйте процессор вместо трансформатора, для всех преобразований, которые вы хотите выполнить на входной теме, всякий раз, когда есть упреждающая панель поиска данных из GlobalStateStoreStore. Используйте context.forward(key,value,childName) , чтобы отправить данные в нижние узлы. <Код> context.forward(key,value,childName) Может быть назван несколько раз в <Код> process() и <Код> punctuate() , чтобы отправить несколько записей в Nownstream Node. Если есть требование обновить GLOBALSTATESTESTORESTORE, сделайте это только в процессоре PROCESTOR addGlobalStore(..) , потому что есть глобалstreamThread, связанный с GlobalStateStoreStoreStore, который сохраняет состояние хранения в соответствии с все текущие экземпляры Kstream.

 

Use a Processor instead of Transformer, for all the transformations you want to perform on the input topic, whenever there is a usecase of lookingup data from GlobalStateStore . Use context.forward(key,value,childName) to send the data to the downstream nodes. context.forward(key,value,childName) may be called multiple times in a process() and punctuate() , so as to send multiple records to downstream node. If there is a requirement to update GlobalStateStore, do this only in Processor passed to addGlobalStore(..) because, there is a GlobalStreamThread associated with GlobalStateStore, which keeps the state of the store consistent across all the running kstream instances.

</div
 
 

Связанный проблема

0  Миграция kafka Schema-реестра на новый кафка-кластер  ( Migrate kafka schema registry to new kafka cluster ) 
Мы выполняем миграцию нашего кластера KAFKA (создавая новую и миграцию всех тем, потребителей, там производителей). У нас есть реестр схемы в старом кластере ...

0  Получите вывод как строка ThumberGroupCommand.main ()?  ( Get output as string of consumergroupcommand main ) 
Я хочу использовать <код> ConsumerGroupCommand.main(new String[]{"--bootstrap-server","xxx:9092","--group","consumer3","--describe"}); , чтобы получить метрик...

9  Кафка несколько потребителей для раздела  ( Kafka multiple consumers for a partition ) 
У меня есть производитель, который пишет сообщения в тему / раздел. Чтобы сохранить заказ, я хотел бы пойти с одним разделом, и я хочу, чтобы 12 потребителей ...

1  Прочитайте конкретные сообщения, используя Apache Kafka и Nodejs  ( Read specific messages using apache kafka and nodejs ) 
Я хочу построить API с NODEJS и KAFKA, который может возникнуть смещение и тема в качестве ввода и вывода первых 10 сообщений, начиная с смещения. Я попробова...

1  Как только по технологии KAFKA по одному потребителю?  ( How does kafka process messages by one consumer only ) 
Я читал несколько статей для Кафки, чтобы понять о потребительской группе. У меня есть одно сомнение, как KAFKA гарантирует, что сообщение будет обработано то...

2  Включение SSL на клиентах Kafka  ( Enabling ssl on kafka clients ) 
У нас есть кластер кафки и различные производители и потребители, работающие по крайней мере 5 диферных серверов. Мне попросили защитить среду кафки с помощью...

5  Кафка ACL проблема с использованием кода Java  ( Kafka acl issue using java code ) 
Я хочу предоставить доступ к кафке в теме через приложение Java, поскольку мы проводим через Kafka-acls.sh. Я просто хотел запустить команду ниже через Java A...

1  Исквое взаимодействие с кафкой с двумя разными принципами  ( Spark interaction with kafka with two different principals ) 
У меня есть следующий вопрос. Я использую искренную структурированную потоковую работу, которая читает из одной темы и пишет на другую тему того же кишечного ...

0  Кафка раковина: ошибка остановка из-за ошибки (org.apache.kafka.connect.cli.connectstandstandalone: ​​130)  ( Kafka sink error stopping due to error org apache kafka connect cli connectsta ) 
Пытаюсь потокотать данные из одного файла потока в другой файл. Раньше он работал раньше и внезапно оно обеспечивает ошибку как <код> ERROR Stopping due to er...

0  Kafka Streams - Несколько тем в качестве того же источника или одной темы на одного источника?  ( Kafka streams multiple topics as same source or one topic per source ) 
При построении топологии кафки потоков, чтения из нескольких тем могут быть смоделированы двумя разными способами: Читайте все темы с одним и тем же исходн...

2  Кафка Потребители Путь в зоофире пусто?  ( Kafka consumers path in zookeeper is empty ) 
Я использую <код> zkCli.sh , чтобы перечислить <код> kafka Путей в <Код> zookeeper . на href="http://cwiki.apache.org/confluence/display/kafka/kafka+data+...

0  Как указать записи опроса Kafka в определенном размере?  ( How to specify kafka poll records in specific size ) 
Есть <код> poll Метод для опроса <код> ConsumerRecords , но он не может указать размер записи, как опросить записи в определенном размере? <код> public Co...

1  Безвеловая сборка Gradle с помощью «Connect to Repo.maven.apache.org:443 [repo.maven.apache.org/151.101.36.215] Не удалось: подключение отказано: Connect" ошибка  ( Gradle build failed with connect to repo maven apache org443 repo maven apach ) 
Я пытаюсь сделать установку кафки в Intellij. Но перед этим согласно инструкциям я клонировал репозиторий Kafka Git, используя Git Bash. Позже, когда я попыта...

0  Как обрабатывать специальный характер в Spark  ( How to handle special character in spark ) 
У нас есть данные AVRO в Кафке, где данные содержат специальные символы, такие как «Fetäälö». Мы пытаемся вставить эти данные в Cive, используя работу Spark...

5  Как я могу заставить гнездо открыть с Python, чтобы закрыть?  ( How can i force socket opened with python to close ) 
Я в настоящее время использую эту lib для стресса, тестируйте kafka Server, который я настроил: https: // github .com / dsully / pykafka <код> import kafk...

Связанный проблема

0  Миграция kafka Schema-реестра на новый кафка-кластер 
0  Получите вывод как строка ThumberGroupCommand.main ()? 
9  Кафка несколько потребителей для раздела 
1  Прочитайте конкретные сообщения, используя Apache Kafka и Nodejs 
1  Как только по технологии KAFKA по одному потребителю? 
2  Включение SSL на клиентах Kafka 
5  Кафка ACL проблема с использованием кода Java 
1  Исквое взаимодействие с кафкой с двумя разными принципами 
0  Кафка раковина: ошибка остановка из-за ошибки (org.apache.kafka.connect.cli.connectstandstandalone: ​​130) 
0  Kafka Streams - Несколько тем в качестве того же источника или одной темы на одного источника? 
2  Кафка Потребители Путь в зоофире пусто? 
0  Как указать записи опроса Kafka в определенном размере? 
1  Безвеловая сборка Gradle с помощью «Connect to Repo.maven.apache.org:443 [repo.maven.apache.org/151.101.36.215] Не удалось: подключение отказано: Connect" ошибка 
0  Как обрабатывать специальный характер в Spark 
5  Как я могу заставить гнездо открыть с Python, чтобы закрыть?