Сравните 2 Spark RDD, чтобы убедиться, что значение сначала в диапазоне второго RDD -- apache-spark пол Связанный проблема

compare 2 spark RDD to make sure that value from first is in the range of the second RDD


3
vote

проблема

русский

Есть 2 очень большой RDD (каждый имеет больше, чем Records Milion), первый это:

 <код> rdd1.txt(name,value):     chr1    10016  chr1    10017  chr1    10018  chr1    20026  chr1    20036  chr1    25016  chr1    26026 chr2    40016  chr2    40116  chr2    50016  chr3    70016   rdd2.txt(name,min,max): chr1     10000  20000 chr1     20000  30000 chr2     40000  50000 chr2     50000  60000 chr3     70000  80000 chr3    810001  910000 chr3    860001  960000 chr3    910001  1010000   

Значение действителен только тогда, когда он находится в диапазоне между мин и максимум второго RDD, количество имени имени будет плюс 1, если его действительный

Введите вышеупомянутый в качестве примера, Chr1 происходит 7.

Как я могу получить результат в Scala с искрой?

Большое спасибо

Английский оригинал

there are 2 very large RDD(each has more than milion records), the first is :

rdd1.txt(name,value):     chr1    10016  chr1    10017  chr1    10018  chr1    20026  chr1    20036  chr1    25016  chr1    26026 chr2    40016  chr2    40116  chr2    50016  chr3    70016   rdd2.txt(name,min,max): chr1     10000  20000 chr1     20000  30000 chr2     40000  50000 chr2     50000  60000 chr3     70000  80000 chr3    810001  910000 chr3    860001  960000 chr3    910001  1010000 

the value is valid only when it's in the range between the Min and Max of the second RDD , the count of the name's occurs will plus 1 if its valid

Take the above as an example, the chr1's occurs 7.

how can i get the result in scala with spark?

many thanks

</div
  
   
   

Список ответов

2
 
vote

попробуйте:

 <код> <div class="access1">MY ACCESS</div>0  
 

Try:

val rdd1 = sc.parallelize(Seq(   ("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018))) val rdd2 = sc.parallelize(Seq(    ("chr1", 10000, 20000), ("chr1",20000, 30000)))  rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name"))  .where($"value".between($"min", $"max")) 
</div
 
 
     
     
0
 
vote
<Р> Как я понимаю, вы хотите значения от rdd1, попадающие между мин и макс в rdd2. Пожалуйста, смотрите, если ниже работ
 <код> val rdd1 = sc.parallelize(Seq(("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018))) val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000))) rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()   scala> val rdd1=sc.parallelize(Seq(("chr1",    10016 ),("chr1",    10017 ),("chr1",    10018 ),("chr1",    20026 ),("chr1",    20036 ),("chr1",    25016 ),("chr1",    26026),("chr2",    40016 ),("chr2",    40116 ),("chr2",    50016 ),("chr3",    70016 ))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24  scala> val rdd2=sc.parallelize(Seq(("chr1",     10000,  20000),("chr1",     20000 , 30000),("chr2",     40000  ,50000),("chr2",     50000  ,60000),("chr3",     70000  ,80000),("chr3",    810001  ,910000),("chr3",    860001  ,960000),("chr3",    910001  ,1010000))) rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24   scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() +----+-----+ |name|count| +----+-----+ |chr3|    1| |chr1|    7| |chr2|    3| +----+-----+   
<Р> <сильный> редактирует Если чтение из файла, я хотел бы использовать следующую
 <код> import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};  val sqlContext = new SQLContext(sc) val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true))) val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true))) val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv") val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv") rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()   
<Р> Это будет работать на всех узлах и нет необходимости распараллеливания вызова. Цитируя документации здесь < / р>
<Р> Защита Распараллеливать [Т] (далее: Seq [T] numSlices: Int = defaultParallelism) (неявное arg0: ClassTag [Т]): РДД [Т] Постоянная ссылка Распределить местную коллекцию Scala для формирования RDD.
 

As I understand , you want values from rdd1 that fall between min and max in rdd2. Please see if the below works

val rdd1 = sc.parallelize(Seq(("chr1", 10016 ), ("chr1", 10017), ("chr1", 10018))) val rdd2 = sc.parallelize(Seq(("chr1", 10000, 20000), ("chr1",20000, 30000))) rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show()   scala> val rdd1=sc.parallelize(Seq(("chr1",    10016 ),("chr1",    10017 ),("chr1",    10018 ),("chr1",    20026 ),("chr1",    20036 ),("chr1",    25016 ),("chr1",    26026),("chr2",    40016 ),("chr2",    40116 ),("chr2",    50016 ),("chr3",    70016 ))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24  scala> val rdd2=sc.parallelize(Seq(("chr1",     10000,  20000),("chr1",     20000 , 30000),("chr2",     40000  ,50000),("chr2",     50000  ,60000),("chr3",     70000  ,80000),("chr3",    810001  ,910000),("chr3",    860001  ,960000),("chr3",    910001  ,1010000))) rdd2: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24   scala> rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() +----+-----+ |name|count| +----+-----+ |chr3|    1| |chr1|    7| |chr2|    3| +----+-----+ 

Edits If you are reading from a file , i would use the following

import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};  val sqlContext = new SQLContext(sc) val nameValueSchema = StructType(Array(StructField("name", StringType, true),StructField("value", IntegerType, true))) val nameMinMaxSchema = StructType(Array(StructField("name", StringType, true),StructField("min", IntegerType, true),StructField("max", IntegerType, true))) val rdd1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameValueSchema).load("rdd1.csv") val rdd2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").schema(nameMinMaxSchema).load("rdd2.csv") rdd1.toDF("name", "value").join(rdd2.toDF("name", "min", "max"), Seq("name")).where($"value".between($"min", $"max")).groupBy($"name").count().show() 

This will run on all nodes and there is no need of parallelize call. Quoting the documentation here

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T] Permalink Distribute a local Scala collection to form an RDD.

</div
 
 
         
         

Связанный проблема

1  Исквое взаимодействие с кафкой с двумя разными принципами  ( Spark interaction with kafka with two different principals ) 
У меня есть следующий вопрос. Я использую искренную структурированную потоковую работу, которая читает из одной темы и пишет на другую тему того же кишечного ...

16  Ошибка теста SBT: Java.lang.noSuchmethodError: net.jpountz.lz4.lz4lockinputtream  ( Sbt test error java lang nosuchmethoderror net jpountz lz4 lz4blockinputstream ) 
Получение ниже исключения, когда я пытался выполнить тесты подразделения для моего светового потокового кода на SBT Windows, используя STALATEST. sbt teston...

1  Загрузка данных NOSQL в Spark Nuckes  ( Loading nosql data into spark nodes ) 
Я пытаюсь понять, что происходит, когда я загружаю данные в искру от источника NoSQL. т.е. Постарается ли это загрузить записи в драйвер, а затем распределить...

1  Оптимизатор Spark Catalyst отличающий исключение  ( Spark catalyst optimizer cast exception ) 
У меня есть 2 (<код> Foo и <код> Bar ) классов, каждый реализует один интерфейс. Приложение имеет метод, который проверяет некоторые условия для интерфейсо...

1  Как я могу избежать проблемы OOM во время записи огромных данных в формате ORC с помощью Pyspark?  ( How can i avoid oom issue while writing huge dataframes in orc format using pysp ) 
У меня есть два скрипта: а б. В скрипте «A» два файла CSV прочитаются на две кадры данных, а затем соединены в результирующем кадр данных, которые затем запис...

2  Искра оболочки ошибка: ERROR SparkDeploySchedulerBackend: В ответ на просьбу удалить несуществующую исполнителя 11  ( Spark shell error error sparkdeployschedulerbackend asked to remove non exist ) 
<Р> Всякий раз, когда я начинаю искровой скорлупу на mapr Песочница я продолжаю получать эту ошибку <код> ERROR SparkDeploySchedulerBackend: Asked to remove ...

1  Pyspark Json String Parsing - ОШИБКА: ValueError: «JSON» не в списке - нет Pandas  ( Pyspark json string parsing error valueerror json is not in list no pand ) 
У меня есть таблица улей со скалярным / нормальным значениями с столбцом в виде JSON в формате String. Давайте возьмем данные ниже списка в качестве примера: ...

1  Spark Streaming MapWithState не удается через 48+ часов с вопросом записи контрольной точки  ( Spark streaming mapwithstate fails after 48 hours with checkpoint write issue ) 
У нас есть программа для искровой потоковой передачи, которая читает вход от KAFKA, используя CONTRESTIRECTSTREAM и создает композитный объект на основе общ...

0  Как отбросить разделы от внешнего стола улья в искру без включения Hivesupport в Spark Session  ( How to drop partitions from hive external table in spark without enabling hivesu ) 
Я хотел бросить перегородки улей в Spark 2.0, но при создании искренности я не хотел включить поддержку улей, так как требуется много библиотек .. Есть ли спо...

-1  Apache Spark: Создание динамической даты и временного фильтра на основе входов пользовательского интерфейса  ( Apache spark build dynamic date and time filter based on the ui inputs ) 
ui, который позволяет пользователю выбирать год, месяц, квартал, ежегодно, одноместный, первую / вторую половину года и многое другое с действительными комбин...

0  Pyspark: Как управлять, какие узлы работают рабочие места?  ( Pyspark how to control which nodes jobs run on ) 
Я запускаю некоторые анализы на искровой кластере, который проявляет некоторое странное поведение - некоторые из 20+ узлов иногда станут не отвечать на ответ....

0  Как увидеть панель прогресса wget и вывод в результате вывода данных databricks  ( How to see wget progress bar and output in databricks output ) 
Я использую WGET, чтобы загрузить файл CSV, но не понятно о ходе загрузки, так как я не могу видеть панель прогресса на выходе, есть ли способ увидеть панель ...

2  Если данные Spark будут кэшировать Off-Heap, у него будет спецификация байта?  ( If sparks data will be cached off heap will it have a byte level specification ) 
Я узнал из разных блогов, особенно этот один , что в ближайшее время Apache Spark будет кэшировать кучу Java (в public class AndroidAudioDevice { AudioT...

3  Pyspark RDD совокупность различных полей значений по-разному  ( Pyspark rdd aggregate different value fields differently ) 
Это довольно открытый законченный вопрос, но у меня есть RDD в этом формате. <код> [('2014-06', ('131313', 5.5, 6.5, 7.5, 10.5 )), ('2014-07', ('246655', 63...

-1  Лучший способ проектирования и параллелизма зажигания в Scala [закрыто]  ( Best way to design and parallelize a spark application in scala ) 
<в сторону CLASS = "S-NEWACTS S-WELTIVE__info JS-Post-New Imide MB16« Роль = «Статус»> закрыт . Этот вопрос находится на основе мнения . В настоящее вре...

Связанный проблема

1  Исквое взаимодействие с кафкой с двумя разными принципами 
16  Ошибка теста SBT: Java.lang.noSuchmethodError: net.jpountz.lz4.lz4lockinputtream 
1  Загрузка данных NOSQL в Spark Nuckes 
1  Оптимизатор Spark Catalyst отличающий исключение 
1  Как я могу избежать проблемы OOM во время записи огромных данных в формате ORC с помощью Pyspark? 
2  Искра оболочки ошибка: ERROR SparkDeploySchedulerBackend: В ответ на просьбу удалить несуществующую исполнителя 11 
1  Pyspark Json String Parsing - ОШИБКА: ValueError: «JSON» не в списке - нет Pandas 
1  Spark Streaming MapWithState не удается через 48+ часов с вопросом записи контрольной точки 
0  Как отбросить разделы от внешнего стола улья в искру без включения Hivesupport в Spark Session 
-1  Apache Spark: Создание динамической даты и временного фильтра на основе входов пользовательского интерфейса 
0  Pyspark: Как управлять, какие узлы работают рабочие места? 
0  Как увидеть панель прогресса wget и вывод в результате вывода данных databricks 
2  Если данные Spark будут кэшировать Off-Heap, у него будет спецификация байта? 
3  Pyspark RDD совокупность различных полей значений по-разному 
-1  Лучший способ проектирования и параллелизма зажигания в Scala [закрыто] 



© 2021 www.qaru.top All Rights Reserved. Q&A House все права защищены


Licensed under cc by-sa 3.0 with attribution required.