您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10488|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    $ O( q8 w( p8 w) k
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ' T; X' T% O3 f4 }9 b

  3. # z* J; z) Y4 d; s9 Q1 M5 r
  4. $worker = new Worker();
    7 v' o: x3 a: V. j3 s+ n1 X) ]# }
  5. // 4个进程
    & z1 J# l# H: @) F
  6. $worker->count = 4;
    $ E" ^5 a$ f% o* O) I9 }" p( q( {
  7. // 每个进程启动后在当前进程新增一个Worker监听
    # u5 Q. `* u+ `5 r  W/ n
  8. $worker->onWorkerStart = function($worker). l" u; A& D. e& G* Q/ S
  9. {2 B. U; h4 k. M$ q
  10.     /**
    / b4 G9 {8 @3 l" q
  11.      * 4个进程启动的时候都创建2016端口的Worker5 Z# B* G. S+ W/ p& D- H8 A
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ! ^9 u3 ]" }7 S4 ^: n& s  V
  13.      * 如果worker->count=1则不会报错
    ; U% u$ \, E$ Q: R: U
  14.      */! O$ q" \- R! J6 t2 P9 z) w
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    . {/ p& s; Z; ^5 J) |0 X) L( y7 O
  16.     $inner_worker->onMessage = 'on_message';/ Y1 H" Q1 R2 J: `3 F, ?: r( B
  17.     // 执行监听。这里会报Address already in use错误7 \0 m( n" t+ j; z; K" l4 {
  18.     $inner_worker->listen();
    . s, A( X+ q! a; }3 i  Q/ u& |
  19. };' [% O4 a! y% G* ]

  20. / R+ o* \8 Q6 S6 e/ [
  21. $worker->onMessage = 'on_message';: F# h6 q4 i4 F( f% Y: E

  22. % U8 \& ^" k/ B
  23. function on_message($connection, $data)
    5 z8 i" q* |+ }2 F7 `) j- V
  24. {. X* W" ]$ p+ ?
  25.     $connection->send("hello\n");
    ! l/ ^! B& C5 \$ M2 n) g1 S
  26. }, [- x; f( }/ @
  27. ; D# {  b& I2 Y: Q/ }, n$ i
  28. // 运行worker* _# W4 A7 p3 k) P/ L* b6 K8 |
  29. Worker::runAll();$ j, z" s" n1 _. g
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    7 J8 T) }& r& B) q9 y
  31. 0 C+ w/ D% o3 X% g
  32. use Workerman\Worker;4 b2 z: L, W! ~5 n
  33. require_once './Workerman/Autoloader.php';" l( `5 e9 O7 k$ A1 x

  34. $ L* `& A1 W  p0 `, w+ B3 f. h. K0 W4 ]
  35. $worker = new Worker('text://0.0.0.0:2015');
    1 L9 f, u( ]8 q# O
  36. // 4个进程" |9 C$ ~6 B' M! `
  37. $worker->count = 4;9 {0 f0 V( ?- u0 x
  38. // 每个进程启动后在当前进程新增一个Worker监听7 D, P7 X5 H' v4 N7 ^
  39. $worker->onWorkerStart = function($worker)
    , N8 g+ A6 F; p5 K  x2 ~2 p
  40. {8 E+ k2 d0 K! D  K0 X1 Y5 Q, P
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    7 \% ]3 \1 G- k9 h+ g7 R
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    2 r1 c7 q1 q( o9 J# _8 f
  43.     $inner_worker->reusePort = true;
    + y! h& _+ k2 q7 R- a3 F
  44.     $inner_worker->onMessage = 'on_message';) v/ F- B  f5 v) l: Y7 o, p
  45.     // 执行监听。正常监听不会报错
    6 E) n* q$ Z0 d8 t' b
  46.     $inner_worker->listen();
    - e" C, b$ q( q) P
  47. };
    1 i3 N- z4 z  Y7 b# f" z: [

  48. ( {7 F) L: x2 L( v( f. f
  49. $worker->onMessage = 'on_message';
    7 W! U' R9 Z' x/ s* F1 X
  50. / K" D. d! m& C7 v, F; y
  51. function on_message($connection, $data)) k( W8 X2 M3 o: F; n. W# Y
  52. {# L9 L# F8 c2 s& {
  53.     $connection->send("hello\n");
    / p+ F6 c' h& ?0 D" }* r
  54. }
    6 B5 ?" X$ g0 O, [

  55. * O- C: L% K, Y8 c2 R! l* ^& V8 ]
  56. // 运行worker: j, O+ b1 S: x+ _: d
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    & f% E" o3 ?  d/ Y
  2. use Workerman\Worker;
    / ^( x$ x8 {" L( O
  3. require_once './Workerman/Autoloader.php';
    3 }" F( U$ @7 g1 n( P. r7 i% X
  4. // 初始化一个worker容器,监听1234端口5 Z# u# A; A  m1 L7 ?. v
  5. $worker = new Worker('websocket://0.0.0.0:1234');5 G5 }5 z5 J" P5 t
  6. # q, u7 N4 d" r$ s9 p) d
  7. /*
    9 {" n0 J6 w! g: @
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    % G! W8 e7 P8 d
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    2 D" c( |0 \9 y7 L
  10. */
    , n; m5 q1 H% q: w
  11. $worker->count = 1;0 ?& R5 E) g+ v. P" E% Y9 o
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    ( ]$ h; V# a3 O6 p6 c; L1 `
  13. $worker->onWorkerStart = function($worker)' J7 q/ r, k, b, G7 ~8 U
  14. {
    / Y- x0 k3 c2 U8 k
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
      w' c9 g+ [5 t9 t7 s) g
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    " v6 v: R- O2 H: F
  17.     $inner_text_worker->onMessage = function($connection, $buffer)3 w( W% K! Y& ?( O5 s1 E$ r/ K
  18.     {
    4 X. ]5 ^9 r/ S( H7 V7 e( s
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    + S, B/ P/ m+ ^
  20.         $data = json_decode($buffer, true);& t" T/ t9 k& B4 x
  21.         $uid = $data['uid'];
    $ U' T0 z$ O" ^" b( W0 @
  22.         // 通过workerman,向uid的页面推送数据
    6 X. G" u" f0 d) S" C  s8 Q# n
  23.         $ret = sendMessageByUid($uid, $buffer);
    3 h" O, O4 l  S1 L+ A( Y. K
  24.         // 返回推送结果+ J& F; \5 ~$ S/ N# I+ q, `
  25.         $connection->send($ret ? 'ok' : 'fail');
    / e( r7 Q$ Y0 R$ L+ L
  26.     };) R. D- U, L! V0 K# u7 {  k
  27.     // ## 执行监听 ##5 R& {/ y4 x% I! P% X3 ^& l
  28.     $inner_text_worker->listen();
    # h2 o& Z' l# ^
  29. };: q* ^6 y) m. @3 C/ @9 |8 \
  30. // 新增加一个属性,用来保存uid到connection的映射
    " x- [* |  j7 ^5 U. Z! R5 a! _0 ?
  31. $worker->uidConnections = array();
    & U7 l8 k4 D# {
  32. // 当有客户端发来消息时执行的回调函数
    # h1 u1 N$ F  L, @- Y* f1 s% n
  33. $worker->onMessage = function($connection, $data)& \; `- l" K4 P4 ?: t9 B, _: g: @3 O! r
  34. {" M  a) @) Z9 k) N, X
  35.     global $worker;+ P3 s6 o, K4 W8 O8 ]
  36.     // 判断当前客户端是否已经验证,既是否设置了uid! _; z/ A0 X; t/ \6 _& l% @
  37.     if(!isset($connection->uid))
    ; \7 E; e6 N& T* m; `0 z9 C
  38.     {7 O$ K, J9 a3 O" A( [
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)$ f: b* V8 r+ e2 e. }
  40.        $connection->uid = $data;
    3 ?; X  b5 B/ S4 f* o  z% l, U
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,: z. n  @( r/ y& P
  42.         * 实现针对特定uid推送数据- C( h$ C$ A( U- |4 @# O
  43.         */6 R  x3 ^" X: o# F0 ]
  44.        $worker->uidConnections[$connection->uid] = $connection;
    8 _% `; V! r5 U3 p
  45.        return;9 Z8 J( u# S9 k+ W! f5 u& ?
  46.     }- k! ~, C# z2 P( a" Y) L6 M
  47. };3 K, f' A4 q- h* W) s, H7 K
  48. 6 I! h' ?, @  y0 x+ ]
  49. // 当有客户端连接断开时
    + K# f+ X9 y  W! d2 W$ t% G7 O# `
  50. $worker->onClose = function($connection)
    5 v) q$ I4 Y9 J7 q) f8 T
  51. {
    ; g: f6 Q2 }: p- ?3 l
  52.     global $worker;
    & f$ ^) V, b; D& r
  53.     if(isset($connection->uid))
    ; }; F8 f$ u  b- ]$ `
  54.     {! x5 D9 H# M0 i& M  Y( @
  55.         // 连接断开时删除映射- b" \% J/ Y. u
  56.         unset($worker->uidConnections[$connection->uid]);
    - h6 I! s; K  w1 N- D" P1 C3 b
  57.     }
    2 x' y' [. i$ g1 I5 i
  58. };
    4 h* o6 N! ~+ M3 n0 n& s

  59. ( i9 m" C& d( Y9 a: W: {# g
  60. // 向所有验证的用户推送数据  k( q0 c" w/ K3 T
  61. function broadcast($message)0 Q; a6 a+ R: T8 z& F
  62. {
    9 P/ x7 \. B7 A/ E
  63.    global $worker;
      B; \) B3 w+ x* ~! }5 n5 ]
  64.    foreach($worker->uidConnections as $connection)
    ( f) b5 F$ p$ y) ?8 q4 {
  65.    {$ O* Y- ^2 s2 H, [4 o
  66.         $connection->send($message);9 M4 s% R3 Y- R# P9 X
  67.    }) c. V2 ]* s& \  X
  68. }: i3 \) ?1 L( N7 d

  69. + _& U7 V: |8 T6 _# Q. K
  70. // 针对uid推送数据
    % f/ \( W- L) r
  71. function sendMessageByUid($uid, $message)$ y# K( b7 ?5 }$ v
  72. {
    & n1 y% R; k/ |2 \( Y% a3 j
  73.     global $worker;2 V4 T9 _  W) ^7 e- a+ {. R2 U# Q1 [$ b
  74.     if(isset($worker->uidConnections[$uid]))
    % A" x$ ^# k- W" J
  75.     {; e, O7 ^( u' j7 p: `/ D2 W" V* m
  76.         $connection = $worker->uidConnections[$uid];% i3 J& ]$ E; q4 u" r
  77.         $connection->send($message);
    , u7 r) a* w5 e3 f' B; ~) |9 g& \8 j
  78.         return true;
    ; K9 z- o+ v( h/ t, e9 U* }6 c& r
  79.     }; e: B% |9 G& I
  80.     return false;7 C! Z6 C  ?$ D7 z0 t; u! z
  81. }
    4 A, U+ q0 ^6 H6 ^. n7 g: m
  82. 7 F- c6 k/ t& E+ H  }& B* j
  83. // 运行所有的worker
    * H" r" v  j" V/ P$ W$ n! H- @3 f( N( I
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ' E) E5 j2 I' P2 O
  2. ws.onopen = function(){
    , z5 }6 q3 D1 G
  3.     var uid = 'uid1';
    - F4 H& J( V8 @+ T5 ]0 k
  4.     ws.send(uid);
    8 ?' e! }) V( l7 R
  5. };5 r( |4 E/ B8 A
  6. ws.onmessage = function(e){
    $ K# V6 T* S& O6 ^  o! ?& e
  7.     alert(e.data);
    ! j( D$ B3 P. ^2 s. j
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    , N3 s+ N# z; F" o3 i& x
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);4 g$ a% l. d$ H
  3. // 推送的数据,包含uid字段,表示是给这个uid推送( b. A9 q9 B. t# Y5 D
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    2 l! Z$ Y* O) ^: {4 n
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    - t* j+ C7 P/ F: J) v1 A
  6. fwrite($client, json_encode($data)."\n");% C8 a. ~6 Q5 _" E
  7. // 读取推送结果% ~0 t% z8 k  z; P0 ]5 B
  8. echo fread($client, 8192);
复制代码
" D! Y5 [6 H; R

% B0 N  g/ o2 k4 [" e) v, `* }
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-17 11:46 , Processed in 0.119095 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!