زمان تخمینی مطالعه:‌ 7 دقیقه

گسترش اسپارک


برنامه اسپارک، برای گسترش خود روی خوشه ( cluster ) از Spark-submit که یک دستور روی Shell است، استفاده می کند.

برای مدیریت خوشه های مربوطه از واسط uniform ( یک شکل ) استفاده می کند.

بنابراین، لازم نیست برای هر خوشه برنامه را config کنید.

مثال :

برگردیم به همان مثال تعداد کلمات و از دستورات shell استفاده کنیم برای روشن تر شدن موضوع ،

نمونه ورودی

متن زیر ورودی ما است که در فایل in.txt ذخیره شده است.

People are not as beautiful as they look,
as they walk or as they talk.
They are only as beautiful as they love,
as they care as they share.

به برنامه زیر نگاه کنید :

SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
24
/* Transform the inputRDD into countRDD */
val count=input.flatMap(line=>line.split(" "))
.map(word=>(word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}

برنامه بالا را در فایلی به نام SparkWordCount.Scala ذخیره کرده و آن را در فولدر Spark-application بریزید.

نکته : هنگام transforming از inutRDD به داخل CountRDD از flatMap() برای توکن کردن خط ( lines ) به کلمات استفاده می کنیم.

از تابع map() برای شمارش تعداد تکرار کلمات و از تابع reduceByKey برای شمارش تکرار هر کلمه استفاده می کنیم.

برای ثبت این برنامه مراحل زیر را طی کنید، تمام مراحل را از طریق ترمینال واقع در پوشه Spark-application اجرا کنید.

مرحله اول : دانلود Spark jar :

برای کامپایل Spark Core Java الزامی است، بنابراین Spark-Core-2.10-1.3.0.jar را از لینک http://mvnrepository.com/artifact/org.apache.spark/Spark-Core-2.10-1.3.0 را دانلود کرده و فایل jar را داخل فولدر Spark-application ذخیره کنید.

مرحله دوم : کامپایل برنامه

با استفاده از دستور زیر برنامه ای که نوشتیم را کامپایل می کنیم، این دستورهم باید از فولدر Spark-application اجرا شود. در اینجا

/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

یک jar با قابلیت ساپورت hadoop از کتابخانه Spark است.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

مرحله سوم : ساخت JAR

فایل jar برنامه را از طریق دستور زیر می سازیم.

در اینجا wordcount نام فایل jar خروجی می باشد.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

مرحله چهارم : ثبت ( Submit ) برنامه اسپارک :

با دستور زیر می توانید برنامه اسپارک را ثبت کنید.

spark-submit --class SparkWordCount --master local wordcount.jar

اگر دستور بالا با موفقیت اجرا شود شما باید خروجی زیر را بگیرید.

واژه ی ok در خروجی برای شناسایی کاربر است و آخرین خط برنامه است.

اگر با دقت به خروجی برنامه زیر دقت کنید، نکات مختلفی در آن پیدا می کنید، مانند :

- اسپارک با موفقیت روی پورت 42954 استارت خورد.

- MemoryStore با حافظه 267.3MB استارت خورد.

- SparkUI روی آدرس http://192.168.1.217:4040 استارت خورد.

- Jar فایل اضافه شد در : /home/hadoop/piapplication/count.jar

- ResultStage اول در 0.566 s تمام شد.

- SparkUI روی آدرس http://192.168.1.217:4040 استاپ شد.

- MemoryStore آزاد شد. ( Clear )

خروجی

15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

مرحله پنجم : چک کردن خروجی :

بعد از اجرای موفقیت آمیز برنامه، شما باید یک فولدر با نام outfile در فولدر Spark-application پیدا کنید.

دستورات زیر برای بازکردن و چک کردن لیست فایل ها درون فولدر outfile استفاده می شوند :

$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS

دستورات برای چک کردن فایل part-00000 :

$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)

دستورات برای چک کردن فایل part-00001 :

$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

برای این که بیشتر با دستور "Spark-Submit" آشنا شوید، بخش بعد را حتما مطالعه کنید.


لطفا در راستای هرچه بهتر شدن کیفیت مطالب نظر خود را در رابطه با این سرفصل برای ما ارسال نمایید.