您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14816|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;1 G. Q% p2 L3 W& A' [
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    # G6 @  k# m2 D' J* t
  3. / Y) k( g9 E9 A1 s( y8 Y
  4. $worker = new Worker();) M4 B2 p( h* H: \% }
  5. // 4个进程
    ; N. u! y% v* J4 G3 d
  6. $worker->count = 4;! x6 e) @$ ]! _% ?/ L6 u) g
  7. // 每个进程启动后在当前进程新增一个Worker监听9 t0 d$ ^! o' x: E
  8. $worker->onWorkerStart = function($worker)" h( W+ _# ~, I1 {7 r
  9. {
    % L6 \0 V1 O$ a. n3 g) n
  10.     /**) M9 _( _2 e  m, U- n
  11.      * 4个进程启动的时候都创建2016端口的Worker  y$ g! Y5 A/ h+ ^) B
  12.      * 当执行到worker->listen()时会报Address already in use错误7 j" T) P7 ~+ ~( p
  13.      * 如果worker->count=1则不会报错. Q( Z) D# a2 s7 t
  14.      */
    : P  k0 t: B5 e6 g" r+ {
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    . @" G" t% p% y9 _  V1 |" H
  16.     $inner_worker->onMessage = 'on_message';; f" K% C$ {6 k, c  N
  17.     // 执行监听。这里会报Address already in use错误
    " i7 V' \& i& O7 ~. b  @2 x
  18.     $inner_worker->listen();! \- o( g$ ]# i6 I$ O
  19. };- m8 k; J5 D& u" }# k: ~3 `
  20. # Z/ D+ q) u" X7 F9 ^
  21. $worker->onMessage = 'on_message';
    ! Z5 R! [  H% B  \( C

  22. ( e  f( \, F0 B; w5 ]
  23. function on_message($connection, $data)
    $ n+ G# C* J2 }9 G# Y
  24. {1 G) `1 h# H$ e* ]0 N
  25.     $connection->send("hello\n");
    1 M$ U$ k" x5 ?  C' T" g# i
  26. }" {8 x4 @: p! |
  27. # j  a3 G5 x: k  e3 j
  28. // 运行worker7 s6 x( y% }$ [2 E7 V* l
  29. Worker::runAll();. I8 i3 v, X7 J9 a
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 H' k9 L5 W) Q, o" @1 @) }0 v$ E* W
  31. * `. n. H0 b2 a2 m
  32. use Workerman\Worker;1 o% O' s- y  h! e% ?
  33. require_once './Workerman/Autoloader.php';1 O$ H: b" ]9 l% ?, I
  34. / C4 D' G0 C: W3 r" K+ ]3 s
  35. $worker = new Worker('text://0.0.0.0:2015');1 a6 ~  E& S- [1 Z+ w2 }
  36. // 4个进程% s; s+ o, N$ a* A& [! M) ^
  37. $worker->count = 4;
    8 ]3 }) j$ H2 l- v' U
  38. // 每个进程启动后在当前进程新增一个Worker监听
    5 j6 N4 ^: b8 x, t9 ^( z; Q
  39. $worker->onWorkerStart = function($worker): K; ^( B1 R4 E1 m+ I, P
  40. {
    ( y, j$ Y" e9 d0 _& R
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');) H' S6 U7 S- k
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    3 i5 f. H' m- K9 _5 l* e* h
  43.     $inner_worker->reusePort = true;
    ( M2 J4 T+ j4 f' [; K% G
  44.     $inner_worker->onMessage = 'on_message';0 @4 i8 J& X" P% b8 ?6 j9 v. T
  45.     // 执行监听。正常监听不会报错# |0 m0 f% r( |
  46.     $inner_worker->listen();
    . d: v! o& ^5 W& P& ?
  47. };
    . a# L2 r' B, J3 G

  48. 5 a! r. t0 f/ S
  49. $worker->onMessage = 'on_message';1 F* Y. P! d( \8 m5 a
  50. 6 `  C1 O/ t* `0 X
  51. function on_message($connection, $data)- h9 t- i. M1 M6 p6 j0 H
  52. {5 {1 o/ a9 A7 I6 e
  53.     $connection->send("hello\n");) K# _8 o1 X) |: n
  54. }
    - `1 d; U, \# x; N$ I
  55. . O! k' X+ j2 Q* ]: u! Q3 `
  56. // 运行worker
    # P& [3 C. \3 Y# @3 }6 Z4 d, h$ {
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php8 v! m+ p. l  B& P' K
  2. use Workerman\Worker;4 K& `' Z1 z9 X; S" o
  3. require_once './Workerman/Autoloader.php';
    0 B7 M! d" C, t' h! R4 P% @! ~
  4. // 初始化一个worker容器,监听1234端口
    7 v- q4 w( ]8 K1 I
  5. $worker = new Worker('websocket://0.0.0.0:1234');5 @% b$ I' p8 j) W

  6. % _7 C5 s. E! _7 T2 J8 s
  7. /*
    4 e0 v, P: B9 ?/ t9 @
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误3 B& N! ?( N0 v( J% p: P
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)) f3 R! @2 C3 X( S, D% G
  10. */5 ^  G- c3 F$ g# T0 e6 N
  11. $worker->count = 1;1 ]$ ~% r" N8 _- Z7 ~
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口5 G2 p" V/ J, D6 P- T' a0 _, p
  13. $worker->onWorkerStart = function($worker)
    $ V: m% i  D+ Q+ Q; Y6 E
  14. {, k0 i! x7 \% n
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符# c) w( }5 ^1 u( L2 H
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    - d9 ^9 L, H/ W
  17.     $inner_text_worker->onMessage = function($connection, $buffer)3 \" J+ _8 z9 Q1 q3 U4 N
  18.     {
    7 I% l' e9 m6 O. Q
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据! T  u8 K" _2 G4 V+ e9 |5 u& b
  20.         $data = json_decode($buffer, true);
    . K3 Y* b8 }  W
  21.         $uid = $data['uid'];; |9 k( c+ I. r" a) g' _; X
  22.         // 通过workerman,向uid的页面推送数据7 g5 \! w  |, U# V+ a  d0 V
  23.         $ret = sendMessageByUid($uid, $buffer);% q' M9 l1 r( T) B9 g" C* C
  24.         // 返回推送结果5 F. `3 x: [" Q7 b) U
  25.         $connection->send($ret ? 'ok' : 'fail');4 `$ k1 q3 J' R5 M3 Z0 H7 I/ n
  26.     };
    9 A, L6 f  K6 Z* e' f2 p
  27.     // ## 执行监听 ##
    ; T  j) v1 S) }: v- @
  28.     $inner_text_worker->listen();
    1 H5 ?% W  h( ]- @6 J
  29. };- H5 A% [4 t2 S3 v' x( T
  30. // 新增加一个属性,用来保存uid到connection的映射+ |* }8 h9 T: E4 |2 ^
  31. $worker->uidConnections = array();
    ( t1 c9 Y; b2 t9 K3 D4 N, e; ~
  32. // 当有客户端发来消息时执行的回调函数! Q+ r9 [# N# \) [! |
  33. $worker->onMessage = function($connection, $data)# Y: w* R1 W4 l2 _! D- @1 O
  34. {
    $ p6 R3 m( T7 I. h7 Q
  35.     global $worker;; s8 U$ A- J$ U$ t, Y
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    " `0 ~# I5 S$ u% {" o. \( D
  37.     if(!isset($connection->uid))
    : x# |5 i; _7 D) K' _( T! a7 d' i! Q
  38.     {
    ( k+ F, f; g, L/ ]8 D( l
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 k5 |5 I  l9 \$ b9 H7 C
  40.        $connection->uid = $data;, k% o+ e6 _$ u+ H  U
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,2 Q! @& \; L. P$ U* u
  42.         * 实现针对特定uid推送数据
    " O" A$ i5 G! `
  43.         */
    1 n  N& C: r. |, n7 C) L$ y
  44.        $worker->uidConnections[$connection->uid] = $connection;
    9 }4 O. h7 V. e6 a3 P: w. d  c
  45.        return;
    4 Q' q2 w+ P% U0 {
  46.     }3 v" W7 \- Z5 i5 B
  47. };/ r: _9 E* ?. n9 `+ b# @! B, A
  48. , M$ l3 L, o: b+ L: s( Q: p
  49. // 当有客户端连接断开时
    ( y; J$ y; l3 M" \; x4 p
  50. $worker->onClose = function($connection)/ `; U3 j! Y3 o$ M! p$ g; D( m
  51. {
    . E7 M/ b: o2 E# G( B+ v
  52.     global $worker;5 J. C0 Q+ h2 S5 G/ X. A
  53.     if(isset($connection->uid))
    1 t# }1 Q  F) [6 x
  54.     {$ F& Y, V" p; j1 F, `! R# Y
  55.         // 连接断开时删除映射
    # u6 j  F% Q. M: R) q) |; ~& b
  56.         unset($worker->uidConnections[$connection->uid]);* N+ r  P, C; u6 _2 |: s
  57.     }
    0 r' e& m: m$ @& P2 d/ l
  58. };
    % {3 V. a8 V3 Q5 o6 t4 q
  59. . u" ?2 {% W# E5 ?
  60. // 向所有验证的用户推送数据
    $ W& u: ?7 \8 E0 `  T& S; T$ [. j
  61. function broadcast($message)
    2 U& j8 z: j) t8 G% e, L
  62. {! i2 P9 {* d( h+ Y0 K# u3 ]
  63.    global $worker;
    9 `6 _+ A4 z2 c' h% i6 a8 A
  64.    foreach($worker->uidConnections as $connection)
    0 _6 y) b0 ^, J
  65.    {/ M# ^8 f: `+ l& }: C$ L" Y
  66.         $connection->send($message);
    : `# n: e  a; b2 c- \
  67.    }
    9 G8 A+ K3 [1 s, d
  68. }
    % n% w; q- @# L+ K& x" H; e' @; s
  69. : S  h" _- K" f
  70. // 针对uid推送数据) K; l: R! Q8 K, d' j$ C
  71. function sendMessageByUid($uid, $message)
      K2 R4 h% |0 O( y  i5 d
  72. {
    # J7 X0 |1 @' _# D0 u# Y) k
  73.     global $worker;' w( m! Z$ ?+ i. q
  74.     if(isset($worker->uidConnections[$uid]))
    * l5 i2 [% j, v7 Q
  75.     {
    % v+ w" K4 @. |; F  Q: P
  76.         $connection = $worker->uidConnections[$uid];
    - J, h) o4 R& h* a
  77.         $connection->send($message);. L- O6 g$ _8 r5 K& B
  78.         return true;
    ( F: p1 N2 f  u. T5 @$ i& a
  79.     }  q, l/ ?  A) z% W8 v# A1 n# n6 R
  80.     return false;
    # o; K2 Y! @1 E1 T8 c
  81. }
    ! h: C* n$ c# ~7 x2 p( A% T; p; O

  82. 5 i2 \1 L6 O( X2 j- F
  83. // 运行所有的worker
    8 h  M" K. Y) T3 ]3 E
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ! C3 D& G) _5 a( N
  2. ws.onopen = function(){) Q, W5 h. Z, \2 x8 i8 n- \
  3.     var uid = 'uid1';4 v) g7 ?6 P1 u) G
  4.     ws.send(uid);  i' B2 _! U) w+ ]
  5. };
    ' R( B# g( l7 G' H
  6. ws.onmessage = function(e){: e; z6 Y1 N: S* J, r+ [1 o! k
  7.     alert(e.data);
    ! x! c1 M; X: y7 g5 x
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口. Y& x( ~6 B" x
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);4 C1 E7 d( A  h- D- p
  3. // 推送的数据,包含uid字段,表示是给这个uid推送3 Q. Z% q4 h; K: l2 D- E
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');+ Q' _' p- r6 A6 k
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    7 A9 ^4 Q3 h% A# j
  6. fwrite($client, json_encode($data)."\n");$ Z" T1 P, r0 z$ J* r, n. j
  7. // 读取推送结果
    9 L# V$ J  {/ s# E6 Y
  8. echo fread($client, 8192);
复制代码
! ^/ L) f8 R2 a5 o

7 ?6 n5 p' q1 f; b% ]# E
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 21:47 , Processed in 0.067734 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!