@@ -407,7 +407,7 @@ object DiscoveryService {
407
407
case false =>
408
408
initBond(peer).flatMap {
409
409
case Some (result) =>
410
- result.pongReceived.get
410
+ result.pongReceived.get.timeoutTo(config.requestTimeout, Task .pure( false ))
411
411
412
412
case None =>
413
413
Task (logger.debug(s " Trying to bond with $peer... " )) >>
@@ -433,6 +433,7 @@ object DiscoveryService {
433
433
_ <- completePong(peer, responded = false )
434
434
} yield false
435
435
}
436
+ .guarantee(stateRef.update(_.clearBondingResults(peer)))
436
437
}
437
438
}
438
439
@@ -563,7 +564,7 @@ object DiscoveryService {
563
564
564
565
waitOrFetch.flatMap {
565
566
case Left (wait) =>
566
- wait.get
567
+ wait.get.timeoutTo(config.requestTimeout, Task .pure( None ))
567
568
568
569
case Right (fetch) =>
569
570
val maybeEnr = bond(peer).flatMap {
@@ -729,39 +730,49 @@ object DiscoveryService {
729
730
case true =>
730
731
rpc
731
732
.findNode(peer)(target)
732
- .map(_.map(_.toList).getOrElse(Nil ))
733
+ .flatMap {
734
+ case None =>
735
+ Task (logger.debug(s " Received no response for neighbors for $target from ${peer.address}" )).as(Nil )
736
+ case Some (neighbors) =>
737
+ Task (logger.debug(s " Received ${neighbors.size} neighbors for $target from ${peer.address}" ))
738
+ .as(neighbors.toList)
739
+ }
733
740
.flatMap { neighbors =>
734
741
neighbors.filterA { neighbor =>
735
742
if (neighbor.address.checkRelay(peer))
736
743
Task .pure(true )
737
744
else
738
- Task (logger.debug(s " Ignoring neighbor $neighbor from $peer because of invalid relay IP. " )).as(false )
745
+ Task (logger.debug(s " Ignoring neighbor $neighbor from ${peer.address} because of invalid relay IP. " ))
746
+ .as(false )
739
747
}
740
748
}
741
749
.recoverWith {
742
750
case NonFatal (ex) =>
743
- Task (logger.debug(s " Failed to fetch neighbors of $target from $from : $ex" )).as(Nil )
751
+ Task (logger.debug(s " Failed to fetch neighbors of $target from ${peer.address} : $ex" )).as(Nil )
744
752
}
745
753
case false =>
746
- Task (logger.debug(s " Could not bond with $from to fetch neighbors of $target" )).as(Nil )
754
+ Task (logger.debug(s " Could not bond with ${peer.address} to fetch neighbors of $target" )).as(Nil )
747
755
}
748
756
}
749
757
750
758
// Make sure these new nodes can be bonded with before we consider them,
751
759
// otherwise they might appear to be be closer to the target but actually
752
760
// be fakes with unreachable addresses that could knock out legit nodes.
753
- def bondNeighbors (neighbors : Seq [Node ]): Task [Seq [Node ]] = {
754
- Task
755
- .parTraverseUnordered(neighbors) { neighbor =>
756
- bond(toPeer(neighbor)).flatMap {
757
- case true =>
758
- Task .pure(Some (neighbor))
759
- case false =>
760
- Task (logger.debug(s " Could not bond with neighbor candidate $neighbor" )).as(None )
761
+ def bondNeighbors (neighbors : Seq [Node ]): Task [Seq [Node ]] =
762
+ for {
763
+ _ <- Task (logger.debug(s " Bonding with ${neighbors.size} neighbors... " ))
764
+ bonded <- Task
765
+ .parTraverseN(config.kademliaAlpha)(neighbors) { neighbor =>
766
+ bond(toPeer(neighbor)).flatMap {
767
+ case true =>
768
+ Task .pure(Some (neighbor))
769
+ case false =>
770
+ Task (logger.debug(s " Could not bond with neighbor candidate $neighbor" )).as(None )
771
+ }
761
772
}
762
- }
763
- .map(_.flatten )
764
- }
773
+ .map(_.flatten)
774
+ _ <- Task (logger.debug( s " Bonded with ${bonded.size} neighbors out of ${neighbors.size} . " ) )
775
+ } yield bonded
765
776
766
777
def loop (
767
778
local : Node ,
@@ -775,18 +786,19 @@ object DiscoveryService {
775
786
.take(config.kademliaAlpha)
776
787
.toList
777
788
778
- if (contacts.isEmpty)
779
- Task .pure(closest)
780
- else {
781
- Task
782
- .parTraverseUnordered(contacts)(fetchNeighbors)
783
- .map(_.flatten.distinct)
784
- .flatMap(bondNeighbors)
785
- .flatMap { newNeighbors =>
786
- val newClosest = (closest ++ newNeighbors).take(config.kademliaBucketSize)
787
- val newAsked = asked ++ contacts
788
- loop(local, newClosest, newAsked)
789
- }
789
+ if (contacts.isEmpty) {
790
+ Task (logger.debug(s " Lookup finished for $target after asking ${asked.size} nodes. " )).as(closest)
791
+ } else {
792
+ Task (logger.debug(s " Lookup for $target contacting ${contacts.size} nodes. " )) >>
793
+ Task
794
+ .parTraverseUnordered(contacts)(fetchNeighbors)
795
+ .map(_.flatten.distinct)
796
+ .flatMap(bondNeighbors)
797
+ .flatMap { newNeighbors =>
798
+ val newClosest = (closest ++ newNeighbors).take(config.kademliaBucketSize)
799
+ val newAsked = asked ++ contacts
800
+ loop(local, newClosest, newAsked)
801
+ }
790
802
}
791
803
}
792
804
@@ -825,7 +837,9 @@ object DiscoveryService {
825
837
_ <- Task (
826
838
logger.info(s " Successfully enrolled with $enrolled bootstrap nodes. Performing initial lookup... " )
827
839
)
828
- _ <- lookup(nodeId)
840
+ _ <- lookup(nodeId).doOnFinish {
841
+ _.fold(Task .unit)(ex => Task (logger.error(s " Error during initial lookup " , ex)))
842
+ }
829
843
nodeCount <- stateRef.get.map(_.nodeMap.size)
830
844
_ <- Task (logger.info(s " Discovered $nodeCount nodes by the end of the lookup. " ))
831
845
} yield ()
0 commit comments