Question

I have install NLTK and its working fine with the following code, I running in pyspark shell

>>> from nltk.tokenize import word_tokenize
>>> text = "Hello, this is testing of nltk in pyspark, mainly word_tokenize functions in nltk.tokenize, working fine with PySpark, please see the below example"
>>> text
//'Hello, this is testing of nltk in pyspark, mainly word_tokenize functions in nltk.tokenize, working fine with PySpark, please see the below example'
>>> word_token  = word_tokenize(text)
>>> word_token
//['Hello', ',', 'this', 'is', 'testing', 'of', 'nltk', 'in', 'pyspark', ',', 'mainly', 'word_tokenize', 'functions', 'in', 'nltk.tokenize', ',', 'working', 'fine', 'with', 'PySpark', ',', 'please', 'see', 'the', 'below', 'example']
>>>

When I try to run it using spark inbuild method map it throwing error ImportError: No module named nltk.tokenize

>>> from nltk.tokenize import word_tokenize
>>> rdd = sc.parallelize(["This is first sentence for tokenization", "second line, we need to tokenize using word_tokenize method in spark", "similar sentence here"])
>> rdd_tokens = rdd.map(lambda sentence : word_tokenize(sentence))
>> rdd_tokens
// PythonRDD[2] at RDD at PythonRDD.scala:43
>>> rdd_tokens.collect()

I am using spark version:1.6.1 and python version: 2.7.9

Fullstack errors:

>>> from nltk.tokenize import word_tokenize
>>> rdd = sc.parallelize(["This is first sentence for tokenization", "second line, we need to tokenize using word_tokenize method in spark", "similar sentence here"])
>> rdd_tokens = rdd.map(lambda sentence : word_tokenize(sentence))
>> rdd_tokens
// PythonRDD[2] at RDD at PythonRDD.scala:43
>>> rdd_tokens.collect()
    16/05/17 17:06:48 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 16, spark-w-0.c.clean-feat-131014.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/lib/spark/python/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
    ImportError: No module named nltk.tokenize

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
			at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
			at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
			at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
			at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
			at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
			at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
			at org.apache.spark.scheduler.Task.run(Task.scala:89)
			at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    16/05/17 17:06:49 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
    16/05/17 17:06:49 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 23, spark-w-0.c.clean-feat-131014.internal): org.apache.spark.TaskKilledException
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:204)
			at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
			at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib/spark/python/pyspark/rdd.py", line 771, in collect
        port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
      File "/usr/lib/spark/python/pyspark/sql/utils.py", line 45, in deco
        return f(*a, **kw)
      File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 22, spark-w-0.c.clean-feat-131014.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/lib/spark/python/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
    ImportError: No module named nltk.tokenize

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
			at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
			at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
			at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
			at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
			at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
			at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
			at org.apache.spark.scheduler.Task.run(Task.scala:89)
			at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
			at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
			at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
			at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
			at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
			at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
			at scala.Option.foreach(Option.scala:236)
			at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
			at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
			at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
			at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
			at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
			at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
			at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
			at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
			at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
			at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
			at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
			at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
			at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
			at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
			at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/usr/lib/spark/python/pyspark/worker.py", line 98, in main
        command = pickleSer._read_with_length(infile)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 164, in _read_with_length
        return self.loads(obj)
      File "/usr/lib/spark/python/pyspark/serializers.py", line 422, in loads
        return pickle.loads(obj)
    ImportError: No module named nltk.tokenize

        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
			at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
			at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
			at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
			at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
			at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
			at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
			at org.apache.spark.scheduler.Task.run(Task.scala:89)
			at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more

    >>> 
Was it helpful?

Solution

It looks like you installed it only on the driver/gateway and not on the nodes/workers itself. The test you ran in the shell is running it locally, once you map a function via your SparkContext it gets distributed to the workers which don't have NLTK installed.

Licensed under: CC-BY-SA with attribution
Not affiliated with datascience.stackexchange
scroll top